Closed-loop acquisition and perturbation with pycro-manager

an example of closed-loop experimentation enabled by pycro-manager

When imaging live biological samples, we often have specific features of cellular activity we are interested in, such as a pattern of neural activity or stage in the cell cycle. We can interrogate these dynamics with closed-loop (CL) experimental design. CL perturbations are triggered by signals derived from data acquired from the sample itself during a live recording session. Recent advancements in computing allow experimenters to coduct closed-loop experiments, which will deeply influence optical physiology, allowing realtime adaptation to animal state, enforcement of physiological constraints on evoked patterns, calibrated control with cellular resolution, and a variety of important experimental controls that were previously inaccessible (Grosenick, Marshel, and Deisseroth 2016 Neuron). Specifically, CL experiments:

  • ensure perturbation occurs during statistically rare conditions

  • allow online tuning of optogenetic inputs in vivo (to achieve specific output parameters)

  • allow online system identification / modeling of neural circuits (i.e. causally establish functional circuit architecture)

  • steer the system into desired or otherwise non-observable states

  • eliminate off-target effects of non-closed-loop perturbations

  • reduce variability of system state at time of stimulus onset

In this example we use features of pycro-manager which enable closed-loop experimentation. Specifically we perform some canonical image processing (template filtering with 2d gaussian kernel, thresholding, median filtering), then find local peaks, then take a window of pixel values around each peak. We use these pixel values to trigger our arbitrary “stimulus” function which can e.g. change optical settings on the microscope, call a separate program, etc.

Here we use snap_image() to acquire our images for readability and to show an example of headless pycromanager acquisition. Alternatively one could use pycro-manager Acquisitions to run our closed-loop experiment. We also leverage a few neat tricks:

  • we strobe our imaging acquisition by introducing a small delay between images. This makes snap_image() timing an order of magnitude more accurate, and reflects a common imaging condition for perturbative experiments, and gives our closed-loop processing algorithm time to perform computation.

  • we use the python package numba to just-in-time compile our closed-loop computation into LLVM intermediate representation. This affords an order-of-magnitude speedup, as numba-compiled numerical algorithms can allow Python code to approach the speeds of C.

By Raymond L. Dunn, the FOCO Lab, UC San Francisco

code

load pycro-manager objects and parameters

[1]:
# simple single image acquisition example with snap
import time

import matplotlib.pyplot as plt
import numpy as np

from pycromanager import Bridge

#### Setup ####
bridge = Bridge()
core = bridge.get_core()

#### imaging settings
exposure = 20
num_frames_to_capture = 100
core.set_exposure(exposure)

#### strobe settings
# by enforcing a strobe (a small delay between acquisitions), our snap_image acquisition framerate becomes an order of magnitude more accurate (as of 20201006)
interframe_interval = 50
assert interframe_interval > exposure

#### holder variables for images, model values, and processing timestamps
frames = []
model = []
acq_timestamps = []
process_timestamps = []
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-1-4762c2ba8208> in <module>
      2 import time
      3
----> 4 import matplotlib.pyplot as plt
      5 import numpy as np
      6

ModuleNotFoundError: No module named 'matplotlib'

define an image quantification function

[2]:
# make sure you have this module downloaded and in the appropriate directory so you can import it
# you might have to install some other python dependencies
import ImageProcessorFOCO as ImageProcessor


# define your image processing function
# in this case we're doing some image processing, finding local peaks, and taking a 3x3 grid of pixel values from each peak
# this function returns whether or not to trigger stimulation
def process_frame(frame, ip, is_demo=False):

    # if we're running this example with the micromanager demo config, peakfinding doesn't really make sense on gratings
    if is_demo:
        return 0

    # simple peakfinding algorithm from accompanying module
    xys_list = ip.segmentchunk(frame.astype(np.float32))

    # if no peaks, return placeholder value
    if len(xys_list) == 0:
        return 0

    # grab 3x3 pixels around each peak
    pix = []
    xys = np.array(xys_list) - 1  # -1 because of single pixel offset bug...
    pix.append(frame[xys[:, 0], xys[:, 1]])
    pix.append(frame[xys[:, 0], xys[:, 1] - 1])
    pix.append(frame[xys[:, 0], xys[:, 1] + 1])
    pix.append(frame[xys[:, 0] - 1, xys[:, 1]])
    pix.append(frame[xys[:, 0] - 1, xys[:, 1] - 1])
    pix.append(frame[xys[:, 0] - 1, xys[:, 1] + 1])
    pix.append(frame[xys[:, 0] + 1, xys[:, 1]])
    pix.append(frame[xys[:, 0] + 1, xys[:, 1] - 1])
    pix.append(frame[xys[:, 0] + 1, xys[:, 1] + 1])

    # flatten and sort peak-averages
    peak_averages = np.sort(np.array(pix).mean(axis=0).flatten())

    # in this example let's just average across peaks
    avg = peak_averages.mean()

    return avg


#### quantification settings
# initialize jit precompilation with an intial image from the microscope
ip = ImageProcessor.ImageProcessor()
core.snap_image()
tagged_image = core.get_tagged_image()
frame = np.reshape(
    tagged_image.pix, newshape=[tagged_image.tags["Height"], tagged_image.tags["Width"]]
)
garbage = process_frame(frame, ip)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-1-65adf5a3d9c5> in <module>
      1 # make sure you have this module downloaded and in the appropriate directory so you can import it
      2 # you might have to install some other python dependencies
----> 3 import ImageProcessorFOCO as ImageProcessor
      4
      5

~/checkouts/readthedocs.org/user_builds/pycro-manager/checkouts/stable/docs/source/ImageProcessorFOCO.py in <module>
      1 import numpy as np
----> 2 import cv2
      3 from numba import jit
      4
      5

ModuleNotFoundError: No module named 'cv2'

define a function for how your real-time quantified data triggers e.g. a microfluidic solenoid or a laser

[3]:
# for this demo we have a dummy function (it's over 9000 lol)
def process_model(model):

    threshold = 9000
    if model[-1] > threshold:

        # code here to do whatever perturbation you want
        pass

    return

run acquisition. iteratively take frames, quantify, and check for stimulation trigger

[4]:
#### do acquisition
print("beginning acquisition...")
t0 = time.time()
next_call = time.time()  # updated periodically for when to take next image
for f in range(num_frames_to_capture):

    # snap image
    core.snap_image()
    tagged_image = core.get_tagged_image()

    # save acquisition time timestamp
    t1 = time.time()
    acq_timestamps.append(time.time() - t0)

    # pixels by default come out as a 1D array. We can reshape them into an image
    frame = np.reshape(
        tagged_image.pix,
        newshape=[tagged_image.tags["Height"], tagged_image.tags["Width"]],
    )

    # quantify image and save processing time timestamp
    val = process_frame(frame, ip, is_demo=True)
    process_timestamps.append(time.time() - t1)

    # store latest value in model and conditionally trigger perturbation
    model.append(val)
    process_model(model)

    # helpful printout to monitor progress
    if f % 50 == 0:
        print("current frame: {}".format(f))

    # wait until we're ready to snap next image. note that in this example, the first few images may exceed the strobe delay as numba jit compiles the relevant python functions
    nowtime = time.time()
    next_call = next_call + interframe_interval / 1000
    if next_call - nowtime < 0:
        print(
            "warning: strobe delay exceeded inter-frame-interval on frame {}.".format(f)
        )
    else:
        time.sleep(next_call - nowtime)

print("done!")
beginning acquisition...
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-d48ae0c3e1e0> in <module>
      3 t0 = time.time()
      4 next_call = time.time()  # updated periodically for when to take next image
----> 5 for f in range(num_frames_to_capture):
      6
      7     # snap image

NameError: name 'num_frames_to_capture' is not defined
[5]:
print("thanks for reading!")
thanks for reading!
[ ]: