Computational quantitative phase imaging from focal stacks

In this tutorial we will use Pycro-Manager to compute 2D quantitative phase images from collected focal stacks, without the need for specialized optics, by using computational imaging. Specifically, we will solve and inverse problem based on the Transport of Intensity Equation (TIE). There are multiple ways of setting up and solving this inverse problem. In this example we will demonstrate how to solve it using exponentially-spaced Z-planes and a Gaussian process regression solver.

The inverse problem solving code used in this notebook is a translation of Matlab code that can be found here


Part 1: Setup

Create that functions that will be used to solve the inverse problems

[1]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from numpy import fft
from scipy.io import loadmat

# import tensorflow_probability as tfp
from scipy.optimize import fsolve

"""
intensities: defocused intensity stack, np array
z_vec: positions of images
lambda: wavelength
ps: pixel size
zfocus1: focal plane
Nsl: # of samples in Fourier space
"""


def GP_TIE(Ividmeas, z_vec, lambd, ps, zfocus, Nsl=100, eps1=1, eps2=1, reflect=False):

    # code expects 2D arrays
    if len(z_vec.shape) == 1:
        z_vec = z_vec[:, None]
    if isinstance(ps, float):
        ps = np.array([[ps]])
    elif len(ps.shape) == 1:
        ps = ps[:, None]

    RePhase1 = RunGaussianProcess(
        Ividmeas, zfocus, z_vec, lambd, ps, Nsl, eps1, eps2, reflect
    )
    RePhase1 = RePhase1 / np.mean(Ividmeas)
    #     print("rephase1: ", RePhase1)
    #     print("norm: ", np.mean(Ividmeas))
    return RePhase1


def RunGaussianProcess(Ividmeas, zfocus, z_vec, lambd, ps, Nsl, eps1, eps2, reflect):

    (Nx, Ny, Nz) = Ividmeas.shape
    I0 = Ividmeas[:, :, zfocus]
    zfocus = z_vec[zfocus]

    ### Calculate S_c ###

    # why is dz=1
    freqs = CalFrequency(Ividmeas[:, :, 0], lambd, ps, 1)
    max_freq = np.max(freqs)
    max_freq = np.sqrt(max_freq / (lambd / 2))
    freq_cutoff = np.linspace(0, 1, Nsl) * max_freq
    freq_cutoff = freq_cutoff ** 2 * lambd / 2

    SigmafStack = np.zeros((Nsl, 1))
    SigmanStack = np.zeros((Nsl, 1))
    SigmalStack = np.zeros((Nsl, 1))

    freq_to_sc = np.linspace(1.2, 1.1, Nsl)
    p = Nz / (np.max(z_vec) - np.min(z_vec))

    # Figure out GP regression

    for k in range(Nsl):
        Sigman = 10.0 ** -9
        Sigmaf = 1.0

        f1 = freq_cutoff[k]
        sc = f1 * freq_to_sc[k]
        a = sc ** 2 * 2 * np.pi ** 2
        b = np.log((p * (2 * np.pi) ** 0.5) / Sigman)

        def fu2(x):
            return a * np.exp(x) - 0.5 * x - b

        x = fsolve(fu2, 5)
        Sigmal = np.exp(x)

        SigmafStack[k] = Sigmaf
        SigmanStack[k] = Sigman
        SigmalStack[k] = Sigmal
    # print("SigmafStack: ", SigmafStack)
    # print("SigmanStack: ", SigmanStack)
    # print("SigmalStack: ", SigmalStack)

    dIdzStack = np.zeros((Nx, Ny, Nsl))
    CoeffStack = np.zeros((Nz, Nsl))
    Coeff2Stack = np.zeros((Nz, Nsl))
    for k in range(Nsl):
        Sigmal = SigmalStack[k]
        Sigman = SigmanStack[k]
        Sigmaf = SigmafStack[k]

        ### GP Regression step
        dIdz, Coeff, Coeff2 = GPRegression(
            Ividmeas, zfocus, z_vec, Sigmaf, Sigmal, Sigman
        )
        #         print("dIdz: ", dIdz)
        dIdzStack[:, :, k] = 2 * np.pi / lambd * ps ** 2 * dIdz
        CoeffStack[:, k] = Coeff
        Coeff2Stack[:, k] = Coeff2

    dIdzC = CombinePhase(dIdzStack, freq_cutoff, freqs, CoeffStack, Coeff2Stack)

    # print("dIdzStack: ", dIdzStack)
    # print("CoeffStack: ", CoeffStack)
    # print("Coeff2Stack: ", Coeff2Stack)
    ### poisson solver

    Del2_Psi_xy = (-2 * np.pi / lambd) * dIdzC

    N = dIdzC.shape[0]
    Psi_xy = poisson_solve(Del2_Psi_xy, ps, eps1, 0, reflect)

    #     print("Psi_xy: ", Psi_xy)

    Grad_Psi_x, Grad_Psi_y = np.gradient(Psi_xy / ps)
    Grad_Psi_x = Grad_Psi_x / (I0 + eps2)
    Grad_Psi_y = Grad_Psi_y / (I0 + eps2)
    #     print("Grad_Psi_x: ", Grad_Psi_x.shape)

    grad2x, _ = np.gradient(Grad_Psi_x / ps)
    _, grad2y = np.gradient(Grad_Psi_y / ps)
    Del2_Psi_xy = grad2x + grad2y
    #     print("Del2_Psi_xy: ", Del2_Psi_xy.shape)

    Phi_xy = poisson_solve(Del2_Psi_xy, ps, eps1, 1, reflect)
    #     print("Phi_xy: ", Phi_xy.shape)

    dcval = (
        np.sum(Phi_xy[:, 0])
        + np.sum(Phi_xy[0, :])
        + np.sum(Phi_xy[N - 1, :])
        + np.sum(Phi_xy[:, N - 1])
    ) / (4 * N)

    RePhase = -1 * (Phi_xy - dcval)
    # print("dIdzC: ", dIdzC.shape)
    # print("Del2_Psi_xy: ", Del2_Psi_xy.shape)
    # print("Phi_xy: ", Phi_xy.shape)
    # print("dcval: ", dcval.shape)
    # print("rephase: ", RePhase.shape)
    return RePhase


def CalFrequency(img, lambd, ps, dz):
    (nx, ny) = img.shape

    dfx = 1 / nx / ps
    dfy = 1 / ny / ps

    (Kxdown, Kydown) = np.mgrid[-nx // 2 : nx // 2, -ny // 2 : ny // 2]

    Kxdown = Kxdown * dfx
    Kydown = Kydown * dfy

    freqs = lambd * np.pi * (Kxdown ** 2 + Kydown ** 2)

    # normalized for sampling step and GP Regression ?
    freqs = freqs * dz / (2 * np.pi)

    return freqs


def CombinePhase(dIdzStack, Frq_cutoff, freqs, CoeffStack, Coeff2Stack):
    def F(x):
        return fft.ifftshift(fft.fft2(fft.fftshift(x)))

    def Ft(x):
        return fft.ifftshift(fft.ifft2(fft.fftshift(x)))

    Nx, Ny, Nsl = dIdzStack.shape

    dIdzC_fft = np.zeros((Nx, Ny))
    Maskf = np.zeros((Nx, Ny))

    f0 = 0
    f1 = 1

    for k in range(Nsl):
        dIdz = dIdzStack[:, :, k]
        dIdz_fft = F(dIdz)

        f1 = Frq_cutoff[k]
        Maskf = np.zeros((Nx, Ny))
        Maskf[np.argwhere((freqs <= f1) & (freqs > f0))] = 1
        f0 = f1
        dIdzC_fft = dIdzC_fft + (dIdz_fft * Maskf)

    return np.real(Ft(dIdzC_fft))


def poisson_solve(func, ps, eps, symm, reflect):
    N = len(func)

    if reflect != 0:
        N = N * 2
        func = np.hstack([func, np.fliplr(func)])
        func = np.vstack([func, np.flipud(func)])

    wx = 2 * np.pi * np.arange(0, N, 1) / N
    fx = 1 / (2 * np.pi * ps) * (wx - np.pi * (1 - N % 2 / N))
    [Fx, Fy] = np.meshgrid(fx, fx)
    func_ft = np.fft.fftshift(np.fft.fft2(func))

    Psi_ft = func_ft / (-4 * np.pi ** 2 * (Fx ** 2 + Fy ** 2 + eps))
    if symm:
        Psi_xy = np.fft.irfft2(np.fft.ifftshift(Psi_ft)[:, 0 : N // 2 + 1])
    else:
        Psi_xy = np.fft.ifft2(np.fft.ifftshift(Psi_ft))

    if reflect != 0:
        N = N // 2
        Psi_xy = np.array(Psi_xy)[:N, :N]
    #     print("Psi_ft: ", Psi_ft.shape, "Psi_xy: ", Psi_xy.shape)
    return Psi_xy


def mrdivide(A, B):
    # Solves A / B or xA = B
    return A.dot(np.linalg.pinv(B))


def GPRegression(Ividmeas, zfocus, z, Sigmaf, Sigmal, Sigman):
    Nx, Ny, Nz = Ividmeas.shape
    ones = np.ones((Nz, 1))
    KZ = ones.dot(z.T) - z.dot(ones.T)
    #     print("z: ", z)

    K = Sigmaf * (np.exp(-1 / 2 / Sigmal * (KZ ** 2)))
    L = np.linalg.cholesky(K + (Sigman * np.eye(Nz))).T  # why multiplying by I
    z2 = zfocus
    #     print("zfocus: ", zfocus)

    Nz2 = len(z2)
    ones2 = np.ones((Nz2, 1))
    KZ2 = ones * (z2.T) - z * (ones2.T)
    #     print("KZ2: ", KZ2)
    # print("KZ2 stuff: ", ones, z2, z, ones2)

    D = Sigmaf * (np.exp((-1 / 2 / Sigmal) * (KZ2 ** 2))) / -Sigmal * KZ2
    # print("D: ", D)
    # print("KZ2: ", KZ2)
    # print("sigmaf: ", Sigmaf)
    # print("sigmal: ", Sigmal)
    # return
    Coeff = mrdivide(mrdivide(D.T, L), L.T)[0]  # D.T/L/L.T
    # print("D: ", D)
    # print("L: ", L)
    # print("Coeff: ", Coeff)
    D2 = Sigmaf * (np.exp((-1 / 2 / Sigmal) * (KZ2 ** 2)))
    Coeff2 = mrdivide(mrdivide(D2.T, L), L.T)  # D2.T/L/L.T

    dIdz = np.zeros((Nx, Ny))

    for k in range(Nz):
        dIdz = dIdz + Ividmeas[:, :, k].dot(Coeff[k])
    #     print(k)
    #     print(Ividmeas[:,:,k])
    #     print(Coeff[k])
    #     print(Ividmeas[:,:,k].dot(Coeff[k]))
    # print("dIdz: ", dIdz)

    return dIdz, Coeff, Coeff2

Test the function using simulated data

Test the functions using simulated data, which can be accessed here

[2]:
test_path = "phase_rec_GUI/datasets/moustache_man_stack.mat"

data = loadmat(test_path)
Ividmeas = data["Istack"]
z_vec = np.ravel(data["zvec"])

lambd = data["lambda"][0][0]
ps = data["ps"]
zfocus = 1
Nsl = 100

phase = GP_TIE(Ividmeas.astype(np.float), np.ravel(z_vec), lambd, ps, zfocus)
# print("phase: ", phase)
plt.imshow(phase)
# plt.hist(np.ravel(phase))
plt.show()
../_images/application_notebooks_pycro_manager_tie_demo_4_0.png

Part 2: Implement as a Pycro-manager image processor

be sure to turn debug mode off when running on actual hardware. This example will NOT work in non-debug mode with the micro-manager demo config, because an array of all zeros will be output

[3]:
import copy

import numpy as np

from pycromanager import Acquisition, multi_d_acquisition_events

planes_per_z_stack = 5
lambd = 6.328e-07  # wavelength of the illumination light, in meters
debug = True  # compute the phase image form test data in previous cell

# This image processor will run each time an image is acquired. If the image is the last one
# in the z-stack, the inverse problem will be solved, and the result added to the GUI. Otherwise
# the image will be accumulated into a temporary list
def img_process_fn(image, metadata):

    # accumulate images as they come
    if not hasattr(img_process_fn, "images"):
        img_process_fn.images = []
        img_process_fn.z_positions = []

    # add pixels and z position
    img_process_fn.images.append(image)
    img_process_fn.z_positions.append(metadata["ZPosition_um_Intended"])

    if metadata["Axes"]["z"] == planes_per_z_stack - 1:
        # its the final one in the z stack

        z_positions = np.array(img_process_fn.z_positions)
        images = np.stack(img_process_fn.images, axis=2).astype(np.float)

        # the z position that is solved for -- assume this is the median of the z-stack (i.e. its symmetrical)
        solved_plane_index = np.argmin(np.abs(z_positions - np.median(z_positions)))

        if debug:
            # debugging -- send in the test data instead
            phase_img = GP_TIE(
                Ividmeas.astype(np.float), np.ravel(z_vec), lambd, ps, zfocus
            )
        else:
            phase_img = GP_TIE(
                images,
                z_positions,
                lambd,
                1e-6 * metadata["PixelSizeUm"],
                solved_plane_index,
            )

        # rescale to 16 bit, since the viewer doesn't accept 32 bit floats
        phase_img = (
            ((phase_img - np.min(phase_img)) / (np.max(phase_img) - np.min(phase_img)))
            * (2 ** 16 - 1)
        ).astype(">u2")

        # create new metadta to go along with this phase image
        phase_image_metadata = copy.deepcopy(metadata)
        # make it appear as a new channel
        phase_image_metadata["Channel"] = "Phase"
        # Put it the z index closest to the solved plane
        phase_image_metadata["Axes"]["z"] = solved_plane_index

        # reset in case multiple z-stacks
        img_process_fn.images = []
        img_process_fn.z_positions = []

        # return the original image and the phase image, in a new channel
        return [(image, metadata), (phase_img, phase_image_metadata)]
    else:
        return image, metadata


img_process_fn.images = []
img_process_fn.z_positions = []

with Acquisition(
    directory="/path/to/save", name="acq_name", image_process_fn=img_process_fn
) as acq:
    # Generate the events for a single z-stack
    events = []
    for index, z_um in enumerate(np.linspace(0, 10, planes_per_z_stack)):
        evt = {"axes": {"z": index}, "z": z_um}
        events.append(evt)
    acq.acquire(events)