Streaming data from micro-manager to napari: PSF Viewer
If you use this tool, please cite:
pycro-manager: Pinkard, H., Stuurman, N., Ivanov, I.E. et al. Pycro-Manager: open-source software for customized and reproducible microscope control. Nat Methods (2021). doi: 10.1038/s41592-021-01087-6
napari: napari contributors (2019). napari: a multi-dimensional image viewer for python. doi: 10.5281/zenodo.3555620
This notebook shows how to acquire data using
micromanager
, then use pycro-manager
to stream it to napari
.Buttons to start and stop data acquisition are added to the
napari
window using the magic-gui
package. In this example, the data displayed in napari
is resliced to get a live PSF viewer. However, reslicing is only a small example for the data analysis possible using napari
.Here are two videos showing the PSF viewer in action:
- PSFViewer-ExternalStageControl_1080p.mp4: z-stage controlled via
micromanager
- PSFViewer-InternalStageControl_1080p.mp4: z-stage controlled via external DAQ control
Since the amount of data that can be transferred between
micromanager
and pycro-manager
is currently limited to 100 MB/s, it’s important that no more data is transferred to ensure smooth execution of the software.For both movies, camera acquisition parameters in
micromanager
were set to:- 11-bit depth,
- chip-size cropped to the central 512x512 px.
- external trigger start (trigger comming at 45 Hz)
- exposure time set to 0.01 ms
Tested on:
- macOS Catalina using
micromanager 2.0.0-gamma1-20210221
[1]:
# only execute first time to install all required packages
# has been tested with the indicated package versions
#!pip install pycromanager==0.10.9 napari==0.4.5 pyqt5==5.15.1 magicgui==0.2.5 yappi==1.3.2
# newest: magicgui==0.2.6, but there's an error message when connecting the buttons
# when updating pycromanager, you may have to update micro-manager as well
# when updating magicgui, napari may have to be updated
[2]:
import time
import numpy as np
import queue
#import yappi # needed for benchmarking multithreaded code
import napari
from napari.qt import thread_worker
from magicgui import magicgui
from pycromanager import Acquisition, multi_d_acquisition_events
# open napari in an extra window
%gui qt
define constants
some constants for microscope parameters
and display options
global variables for multithreading
[3]:
# data acquired on microscope or simulated?
simulate = False
# z-stage controlled through micromanager, or externally?
z_stack_external = False
# clip image to central part. Speeds up display as data size is reduced
# is used as size for simulating data
clip =[128, 128]
# um / px, for correct scaling in napari
size_um = [0.16, 0.16]
# start in um, end in um, number of slices, active slice
z_range = [0, 50, 200, 0]
#z_range = [1100, 1150, 200, 0]
# rescale z dimension independently for display
z_scale = 1
# sleep time to keep software responsive
sleep_time = 0.05
# contrast limits for display
clim = [100, 300]
# number of color channels, active channel
channels = [1, 0]
# color map for display
cmap = ['plasma', 'viridis']
# layer names for the channels
layer_names = ['GFP', 'RFP']
# initialize global variables
# flag to break while loops
acq_running = False
# empty queue for image data and z positions
img_queue = queue.Queue()
# xyz data stack
data = np.random.rand(z_range[2], clip[0], clip[1]) * clim[1]
# if z-stage is controlled through micromanager:
# need bridge to move stage at beginning of stack
# USE WITH CAUTION: only tested with micromanager demo config
if not(simulate) and not(z_stack_external):
from pycromanager import Core
#get object representing micro-manager core
core = Core()
print(core)
core.set_position(z_range[0])
<pycromanager.core.mmcorej_CMMCore object at 0x7fe4f020adf0>
dev_names = core.get_loaded_devices() for ii in range(dev_names.size()): print(ii, dev_names.get(ii))
print(core.get_property(“Camera”, “PixelType”)) #print(core.get_property(“Z”, “Label”))
stage_xy = core.get_xy_stage_position() pos = [stage_xy.get_x(), stage_xy.get_y()] print(pos) core.set_position(100) print(‘z stage: ‘, core.get_position()) core.stop(‘Z’) # this doesnt work, just continues moving print(‘z stage: ‘, core.get_position())
core.set_position(z_range[0]) # this also doesn’t work time.sleep(5) print(‘z stage: ‘, core.get_position())
Function to write data into Queue
This function is shared by the image acquisition / simulation routine.
Shapes data as needed and keeps track of both z_position and active channel.
[4]:
def place_data(image):
""" fnc to place image data into the queue.
Keeps track of z-position in stacks and of active color channels.
Inputs: np.array image: image data
Global variables: image_queue to write image and z position
z_range to keep track of z position
channels to keep track of channels
"""
global img_queue
global z_range
global channels
img_queue.put([channels[1], z_range[3], np.ravel(image)])
z_range[3] = (z_range[3]+1) % z_range[2]
if z_range[3] == 0:
channels[1] = (channels[1]+1) % channels[0]
#print(z_range, channels)
create dummy image and and put into stack
creates dummy image of constant brightness
use for testing purposes without microscope
stack of increasing brightness helps to identify glitches
[5]:
def simulate_image(b, size = [128,128]):
""" fnc to simulate an image of constant brightness
and call fnc to place it into the queue.
Inputs: int b: brightness
np.array size: # of px in image in xy.
"""
place_data(np.ones(size) * b)
def simulate_data(ii, z_range):
""" fnc to create images with constant, but increasing brightness.
Inputs: int ii: counter to increase brightness
int z_range: number of slices in stack"""
for zz in range(z_range[2]):
brightness = (ii+1) * (zz+1) / ((z_range[2]+1)) * clim[1]
simulate_image(brightness, clip)
time.sleep(sleep_time)
# need sleep time especially when simulated datasize is small or this will kill CPU
image process function and pycromanager acquisition
grabs and clips acquired image
built pycromanager acquisition events
acquire data and send to image_process_fn
[6]:
def grab_image(image, metadata):
""" image_process_fnc to grab image from uManager, clip it to central part
and call the fnc that will put it into the queue.
Inputs: array image: image from micromanager
metadata from micromanager
"""
size = np.shape(image)
image_clipped = image[(size[0]-clip[0])//2:(size[0]+clip[0])//2,
(size[1]-clip[1])//2:(size[1]+clip[1])//2]
place_data(image_clipped)
return image, metadata
def acquire_data(z_range):
""" micro-manager data acquisition. Creates acquisition events for z-stack.
This example: use custom events, not multi_d_acquisition because the
z-stage is not run from micro-manager but controlled via external DAQ."""
with Acquisition(directory=None, name=None,
show_display=True,
image_process_fn = grab_image) as acq:
events = []
for index, z_um in enumerate(np.linspace(z_range[0], z_range[1], z_range[2])):
evt = {"axes": {"z_ext": index}, "z_ext": z_um}
events.append(evt)
acq.acquire(events)
def acquire_multid(z_range):
""" micro-manager data acquisition. Creates acquisition events for z-stack.
This example: use multi_d_acquisition because the z-stage is run
from micro-manager.
Unless hardware triggering is set up in micro-manager, this will be fairly slow:
micro-manager does not sweep the z-stage, but acquires plane by plane. """
with Acquisition(directory=None, name=None,
show_display=False,
image_process_fn = grab_image) as acq:
events = multi_d_acquisition_events(z_start=z_range[0], z_end=z_range[1],
z_step=(z_range[1]-z_range[0])/(z_range[2]-1))
acq.acquire(events)
napari update display
is called whenever the thread worker checking the queue yields an image
adds images into xyz stack and updates data
[7]:
def display_napari(pos_img):
""" Unpacks z position and reshapes image from pos_img. Writes image into correct
slice of data, and updates napari display.
Called by worker thread yielding elements from queue.
Needs to be in code before worker thread connecting to it.
Inputs: array pos_img: queue element containing z position and raveled image data.
Global variables: np.array data: contains image stack
img_queue: needed only to send task_done() signal.
"""
global data
global img_queue
if pos_img is None:
return
# read image and z position
image = np.reshape(pos_img[2:],(clip[0], clip[1]))
z_pos = pos_img[1]
color = pos_img[0]
# write image into correct slice of data and update display
data[z_pos] = np.squeeze(image)
layer = viewer.layers[color]
layer.data = data
#print("updating ", z_pos, color)
img_queue.task_done()
worker threads appending data to queue and reading from queue
[8]:
@thread_worker
def append_img(img_queue):
""" Worker thread that adds images to a list.
Calls either micro-manager data acquisition or functions for simulating data.
Inputs: img_queue """
# start microscope data acquisition
if not simulate:
if z_stack_external:
while acq_running:
acquire_data(z_range)
time.sleep(sleep_time)
else:
while acq_running:
acquire_multid(z_range)
time.sleep(sleep_time)
# run with simulated data
else:
ii = 0
while acq_running:
simulate_data(ii, z_range)
ii = ii + 1
#print("appending to queue", ii)
time.sleep(sleep_time)
@thread_worker(connect={'yielded': display_napari})
def yield_img(img_queue):
""" Worker thread that checks whether there are elements in the
queue, reads them out.
Connected to display_napari function to update display """
global acq_running
while acq_running:
time.sleep(sleep_time)
# get elements from queue while there is more than one element
# playing it safe: I'm always leaving one element in the queue
while img_queue.qsize() > 1:
#print("reading from queue ", img_queue.qsize())
yield img_queue.get(block = False)
# read out last remaining elements after end of acquisition
while img_queue.qsize() > 0:
yield img_queue.get(block = False)
print("acquisition done")
define functions to start and stop acquisition
connect to gui buttons using magic_gui
start_acq
restarts workers, resets acq_running
flag and resets z_range[3]
, ie z_posstop_acq
sets acq_running
flag to False
, which will stop the worker threads[9]:
@magicgui(call_button = "Start")
def start_acq():
""" Called when Start button in pressed. Starts workers and resets global variables"""
print("starting threads...")
global acq_running
global z_range
if not(acq_running):
z_range[3] = 0
acq_running = True
# comment in when benchmarking
#yappi.start()
worker1 = append_img(img_queue)
worker2 = yield_img(img_queue)
worker1.start()
#worker2.start() # doesn't need to be started bc yield is connected
else:
print("acquisition already running!")
@magicgui(call_button = "Stop")
def stop_acq():
print("stopping threads")
# set global acq_running to False to stop other workers
global acq_running
global core
acq_running = False
if not(simulate) and not(z_stack_external):
print('z stage stopping: ', core.get_position())
core.stop("Z") # this doesnt work, just continues moving. eventually micromanager memory overflows
print('z stage stopped: ', core.get_position())
core.set_position(z_range[0]) # this also doesn't work
core.wait_for_device("Z")
#time.sleep(5)
print('z stage zeroed: ', core.get_position())
# comment in when benchmarking
# yappi.stop()
“Main” function: start napari and worker threads
(re-)opens napary viewer
initializes view with random data
sets scale, contrast etc and rolls view. add GUI buttons for start stop
there’s a glitch when acquisition is stopped and restarted too quickly
[10]:
# check if viewer is already open
# if yes: close and reopen
try:
if viewer:
viewer.close()
except:
print("viewer already closed or never opened")
viewer = napari.Viewer(ndisplay=2)
# initialize napari viewer with stack view and random data, reslice view
scale = [(z_range[1]-z_range[0])/z_range[2]*z_scale, size_um[1], size_um[0]]
layers = [viewer.add_image(data,
name = layer_names[c],
colormap = cmap[c],
interpolation = 'nearest',
blending = 'additive',
rendering = 'attenuated_mip',
scale = scale,
contrast_limits = clim)
for c in range(channels[0])]
viewer.dims._roll()
# set sliders to the middle of the stack for all three dimensions.
# doesn't work anymore after fixing scaling
# would have to be done for both layers
#for dd, dim in enumerate(layers[0].data.shape):
# viewer.dims.set_point(dd, dim*scale[2-dd]//2)
# add start stop buttons to napari gui
viewer.window.add_dock_widget(start_acq)
viewer.window.add_dock_widget(stop_acq)
viewer already closed or never opened
[10]:
<napari._qt.widgets.qt_viewer_dock_widget.QtViewerDockWidget at 0x7fe4f5cce790>
starting threads...
stopping threads
acquisition donez stage stopping: 37.688442211055275
z stage stopped: 37.939698492462306
z stage zeroed: 38.19095477386934
stopping threads
z stage stopping: 34.42211055276382
z stage stopped: 34.67336683417085
z stage zeroed: 0
stopping threads
z stage stopping: 17.336683417085425
z stage stopped: 17.336683417085425
z stage zeroed: 0
stopping threads
Get output from yappi
only needs to be run when benchmarking code
[ ]:
print('z stage zeroed: ', core.get_position())
[ ]:
#only needs to be executed when yappi is used
threads = yappi.get_thread_stats()
for thread in threads:
print(
"Function stats for (%s) (%d)" % (thread.name, thread.id)
) # it is the Thread.__class__.__name__
yappi.get_func_stats(ctx_id=thread.id).print_all()