Friday, March 9, 2012

Healing holes in arrays in Python: interpolation vs. inpainting

I had solved that interpolation task I was working on previously, by the way. I post my approach here, in case someone finds it useful.

If the bad pixel regions are big, standard methods of interpolation are not applicable. Interpolation works under assumption that the regions are contiguous, whereas this is not necessarily the case. Besides, none of interpolation implementations in python seem to work with masked arrays, and I've tried a metric ton of them. miceuz, who was sitting on this task half night as well, asked about it on stackoverflow, and someone mentioned the right keyword: inpainting.

That's still inventing the data, but the option is having unquantifiable errors in photometry if we leave out the bad pixels or don't mask the bad areas at all.

OpenCV is a huge piece of software, and using it to fill those holes is tantamount to using a hammer to crack a nut. However, I reused this tiny piece of code, adding a [hardcoded ,)] Gaussian weights kernel as I needed to use inverse distance weighting for interpolation. Here's the code:

import numpy as np
import pyfits
from scipy import ndimage
import inpaint

maskedImg =, mask = cutoutMask)
NANMask =  maskedImg.filled(np.NaN)

badArrays, num_badArrays = sp.ndimage.label(cutoutMask)
print num_badArrays
data_slices = sp.ndimage.find_objects(badArrays)

filled = inpaint.replace_nans(NANMask, 5, 0.5, 2, 'iwd')
hdu = pyfits.PrimaryHDU(filled)

inpaint module boils down to this -- I used the code linked above, just lobotomised the Cython:

# -*- coding: utf-8 -*-

"""A module for various utilities and helper functions"""

import numpy as np
#cimport numpy as np
#cimport cython

DTYPEf = np.float64
#ctypedef np.float64_t DTYPEf_t

DTYPEi = np.int32
#ctypedef np.int32_t DTYPEi_t

#@cython.boundscheck(False) # turn of bounds-checking for entire function
#@cython.wraparound(False) # turn of bounds-checking for entire function
def replace_nans(array, max_iter, tol,kernel_size=1,method='localmean'):
    """Replace NaN elements in an array using an iterative image inpainting algorithm.
The algorithm is the following:
1) For each element in the input array, replace it by a weighted average
of the neighbouring elements which are not NaN themselves. The weights depends
of the method type. If ``method=localmean`` weight are equal to 1/( (2*kernel_size+1)**2 -1 )
2) Several iterations are needed if there are adjacent NaN elements.
If this is the case, information is "spread" from the edges of the missing
regions iteratively, until the variation is below a certain threshold.
array : 2d np.ndarray
an array containing NaN elements that have to be replaced
max_iter : int
the number of iterations
kernel_size : int
the size of the kernel, default is 1
method : str
the method used to replace invalid values. Valid options are
`localmean`, 'idw'.
filled : 2d np.ndarray
a copy of the input array, where NaN elements have been replaced.
#    cdef int i, j, I, J, it, n, k, l
#    cdef int n_invalids
    filled = np.empty( [array.shape[0], array.shape[1]], dtype=DTYPEf)
    kernel = np.empty( (2*kernel_size+1, 2*kernel_size+1), dtype=DTYPEf )
#    cdef np.ndarray[np.int_t, ndim=1] inans
#    cdef np.ndarray[np.int_t, ndim=1] jnans
    # indices where array is NaN
    inans, jnans = np.nonzero( np.isnan(array) )
    # number of NaN elements
    n_nans = len(inans)
    # arrays which contain replaced values to check for convergence
    replaced_new = np.zeros( n_nans, dtype=DTYPEf)
    replaced_old = np.zeros( n_nans, dtype=DTYPEf)
    # depending on kernel type, fill kernel array
    if method == 'localmean':
        print 'kernel_size', kernel_size       
        for i in range(2*kernel_size+1):
            for j in range(2*kernel_size+1):
                kernel[i,j] = 1
        print kernel, 'kernel'

    elif method == 'idw':
        kernel = np.array([[0, 0.5, 0.5, 0.5,0],
                  [0, 0.5, 0.5 ,0.5 ,0]])
        print kernel, 'kernel'      
        raise ValueError( 'method not valid. Should be one of `localmean`.')
    # fill new array with input elements
    for i in range(array.shape[0]):
        for j in range(array.shape[1]):
            filled[i,j] = array[i,j]

    # make several passes
    # until we reach convergence
    for it in range(max_iter):
        print 'iteration', it
        # for each NaN element
        for k in range(n_nans):
            i = inans[k]
            j = jnans[k]
            # initialize to zero
            filled[i,j] = 0.0
            n = 0
            # loop over the kernel
            for I in range(2*kernel_size+1):
                for J in range(2*kernel_size+1):
                    # if we are not out of the boundaries
                    if i+I-kernel_size < array.shape[0] and i+I-kernel_size >= 0:
                        if j+J-kernel_size < array.shape[1] and j+J-kernel_size >= 0:
                            # if the neighbour element is not NaN itself.
                            if filled[i+I-kernel_size, j+J-kernel_size] == filled[i+I-kernel_size, j+J-kernel_size] :
                                # do not sum itself
                                if I-kernel_size != 0 and J-kernel_size != 0:
                                    # convolve kernel with original array
                                    filled[i,j] = filled[i,j] + filled[i+I-kernel_size, j+J-kernel_size]*kernel[I, J]
                                    n = n + 1*kernel[I,J]
                                    print n

            # divide value by effective number of added elements
            if n != 0:
                filled[i,j] = filled[i,j] / n
                replaced_new[k] = filled[i,j]
                filled[i,j] = np.nan
        # check if mean square difference between values of replaced
        #elements is below a certain tolerance
        print 'tolerance', np.mean( (replaced_new-replaced_old)**2 )
        if np.mean( (replaced_new-replaced_old)**2 ) < tol:
            for l in range(n_nans):
                replaced_old[l] = replaced_new[l]
    return filled

def sincinterp(image, x,  y, kernel_size=3 ):
    """Re-sample an image at intermediate positions between pixels.
This function uses a cardinal interpolation formula which limits
the loss of information in the resampling process. It uses a limited
number of neighbouring pixels.
The new image :math:`im^+` at fractional locations :math:`x` and :math:`y` is computed as:
.. math::
im^+(x,y) = \sum_{i=-\mathtt{kernel\_size}}^{i=\mathtt{kernel\_size}} \sum_{j=-\mathtt{kernel\_size}}^{j=\mathtt{kernel\_size}} \mathtt{image}(i,j) sin[\pi(i-\mathtt{x})] sin[\pi(j-\mathtt{y})] / \pi(i-\mathtt{x}) / \pi(j-\mathtt{y})
image : np.ndarray, dtype np.int32
the image array.
x : two dimensions np.ndarray of floats
an array containing fractional pixel row
positions at which to interpolate the image
y : two dimensions np.ndarray of floats
an array containing fractional pixel column
positions at which to interpolate the image
kernel_size : int
interpolation is performed over a ``(2*kernel_size+1)*(2*kernel_size+1)``
submatrix in the neighbourhood of each interpolation point.
im : np.ndarray, dtype np.float64
the interpolated value of ``image`` at the points specified
by ``x`` and ``y``
    # indices
#    cdef int i, j, I, J
    # the output array
    r = np.zeros( [x.shape[0], x.shape[1]], dtype=DTYPEf)
    # fast pi
    pi = 3.1419
    # for each point of the output array
    for I in range(x.shape[0]):
        for J in range(x.shape[1]):
            #loop over all neighbouring grid points
            for i in range( int(x[I,J])-kernel_size, int(x[I,J])+kernel_size+1 ):
                for j in range( int(y[I,J])-kernel_size, int(y[I,J])+kernel_size+1 ):
                    # check that we are in the boundaries
                    if i >= 0 and i <= image.shape[0] and j >= 0 and j <= image.shape[1]:
                        if (i-x[I,J]) == 0.0 and (j-y[I,J]) == 0.0:
                            r[I,J] = r[I,J] + image[i,j]
                        elif (i-x[I,J]) == 0.0:
                            r[I,J] = r[I,J] + image[i,j] * np.sin( pi*(j-y[I,J]) )/( pi*(j-y[I,J]) )
                        elif (j-y[I,J]) == 0.0:
                            r[I,J] = r[I,J] + image[i,j] * np.sin( pi*(i-x[I,J]) )/( pi*(i-x[I,J]) )
                            r[I,J] = r[I,J] + image[i,j] * np.sin( pi*(i-x[I,J]) )*np.sin( pi*(j-y[I,J]) )/( pi*pi*(i-x[I,J])*(j-y[I,J]))
    return r
#cdef extern from "math.h":
 #   double sin(double)
The thing that's bugging me is that this algorithm doesn't traverse around the edges of the hole, rather, it indexes the bad pixels incrementally. However, it does several iterations of smoothing, so maybe it's ok. At least, there are no visible spooky artifacts.

1 comment:

  1. Hi - I just wanted to say thank you for this post! I've been dealing with some patchy radar data, trying to build a digital elevation model for the Totten Ice Shelf in Antarctica. Your method healed the gaps in my data set beautifully and has save me a bunch of time, so thank you :)