Source code for planfftw.planners

# -*- coding: utf-8 -*-

from math import ceil, log2

import numpy as np

from . import _utils
try:
    import pyfftw
    from ._pyfftw_basics import (fft, fft_pair, rfft, rfft_pair,
                                 fftn, fftn_pair, rfftn, rfftn_pair,
                                 ifft, ifft_pair, irfft, irfft_pair,
                                 ifftn, ifftn_pair, irfftn, irfftn_pair)
except ImportError:
    try:
        import scipy.fftpack as _
        from ._scipy_basics import (fft, fft_pair, rfft, rfft_pair,
                                    fftn, fftn_pair, rfftn, rfftn_pair,
                                    ifft, ifft_pair, irfft, irfft_pair,
                                    ifftn, ifftn_pair, irfftn, irfftn_pair)
    except ImportError:
        from ._numpy_basics import (fft, fft_pair, rfft, rfft_pair,
                                    fftn, fftn_pair, rfftn, rfftn_pair,
                                    ifft, ifft_pair, irfft, irfft_pair,
                                    ifftn, ifftn_pair, irfftn, irfftn_pair)


# ### Hidden functions

def _convolve_slice(mode, len_x, len_y):
    if mode == "valid":
        return slice(len_y - 1, len_x)
    elif mode == "same":
        return slice((len_y - 1)//2, len_x + (len_y - 1)//2)
    elif mode == "full":
        return slice(0, len_x + len_y - 1)
    else:
        raise ValueError("Invalid mode: {} (must be one of 'valid', "
                         "'same' or 'full'".format(mode))


def _nd_preparations(x_shape, y_shape, mode, correlation):
    """Slices and FFT functions for N-dimensional convolution/correlation."""
    n_dimensions_x = len(x_shape)
    n_dimensions_y = len(y_shape)
    if not n_dimensions_x == n_dimensions_y:
        fn_type = "correlation" if correlation is True else "convolution"
        raise ValueError("Cannot plan {} for inputs of shape "
                         "{} and {}".format(fn_type, x_shape, y_shape))
    max_shape = np.max((x_shape, y_shape), axis=0)
    nfft = tuple(np.power(2, np.ceil(np.log2(2*max_shape - 1))).astype(int))
    rev_slices = tuple([slice(None, None, -1)]*n_dimensions_y)
    out_slices = tuple(_convolve_slice(mode, len_x, len_y)
                       for len_x, len_y in zip(x_shape, y_shape))

    axes = list(range(n_dimensions_x))
    planned_rfftn, planned_irfftn = \
        rfftn_pair(tuple(max_shape), nfft, axes=axes)

    return rev_slices, out_slices, planned_rfftn, planned_irfftn


def _1d_perparations(x_shape, y_shape, axis, mode):
    """Wrappers, slices and FFT functions for 1-D convolution/correlation."""

    n_dimensions_x = len(x_shape)
    n_dimensions_y = len(y_shape)
    n_dimensions_out = max(n_dimensions_x, n_dimensions_y)

    if axis is None:
        axis = -1

    if n_dimensions_y == 1 and not n_dimensions_x == 1:
        input_shape_ = list(x_shape)
        input_shape_[axis] = max(input_shape_[axis], y_shape[0])
        x_wrapper = _utils.id

        def y_wrapper(y_fft):
            return _utils.expand_me(y_fft, axis, n_dimensions_x)

        x_rfft_, y_rfft_ = (0, 1)

    elif n_dimensions_x == 1 and not n_dimensions_y == 1:
        input_shape_ = list(y_shape)
        input_shape_[axis] = max(input_shape_[axis], x_shape[0])
        y_wrapper = _utils.id

        def x_wrapper(x_fft):
            return _utils.expand_me(x_fft, axis, n_dimensions_x)

        x_rfft_, y_rfft_ = (1, 0)

    else:
        input_shape_ = np.max((x_shape, y_shape), axis=0)
        x_wrapper, y_wrapper = (_utils.id, _utils.id)
        x_rfft_, y_rfft_ = (0, 0)

    planned_rffts = [None, None]
    input_shape = tuple(input_shape_)
    nfft = (int(2**ceil(log2(2*input_shape[axis] - 1))),)
    len_x = x_shape[0] if n_dimensions_x == 1 else x_shape[axis]
    len_y = y_shape[0] if n_dimensions_y == 1 else y_shape[axis]
    rev_slices = tuple(_utils.replace([slice(None)]*n_dimensions_y,
                                      axis,
                                      slice(None, None, -1)))
    out_slices = tuple(_utils.replace([slice(None)]*n_dimensions_out,
                                      axis,
                                      _convolve_slice(mode, len_x, len_y)))

    planned_rffts[0], planned_irfft = rfft_pair(input_shape,
                                                nfft[0],
                                                axis=axis)
    if not n_dimensions_x == n_dimensions_y:
        input_length = x_shape[0] if n_dimensions_x == 1 else y_shape[0]
        planned_rffts[1] = rfft(input_length, nfft[0])
    else:
        planned_rffts[1] = planned_rffts[0]

    return (x_wrapper,
            y_wrapper,
            rev_slices,
            out_slices,
            planned_rffts[x_rfft_],
            planned_rffts[y_rfft_],
            planned_irfft)


def _convolve_or_correlate(plan_x,
                           plan_y,
                           mode="full",
                           axis=None,
                           correlation=False):
    """Returns a planned function for computing the convolution of two
    sequences or arrays.

    Parameters
    ----------
    plan_x : integer, shape or array-like
        First input sequence, it's shape or length.
    plan_y : integer, shape or array-like
        Second input sequence, it's shape or length.
    mode : Optional[str]
        A string indicating the output size.

        'full'
            The output is the full discrete linear convolution of the
            inputs. (Default)
        'valid'
            The output consists only of those elements that do not rely on
            zero-padding.
        'same'
            The output is the same size as plan_x, centered with respect to
            the 'full' output.
    axis : Optional[int]
        The axis or axes along which the convolution is computed.
        Default is the last N axes of the highest-rank input where N is the
        rank of the lowest-rank input.
    correlation : Optional[bool]
        Indicates that correlation, rather than convolution, should be
        planned. Default is False.

    Returns
    -------
    planned_function(x_transformed, y_transformed)
        Planned convolution or correlation function.
    planned_x_transform_function(x)
        Planned transform function for x
    planned_y_transform_function(y)
        Planned transform function for y

    Raises
    ------
    ValueError
        If no function can be planned for the given arguments.
    """

    # Get dimensions
    x_shape = _utils.get_shape(plan_x)
    y_shape = _utils.get_shape(plan_y)

    if axis is None and min(len(x_shape), len(y_shape)) > 1:
        rev_slices, out_slices, x_rfft, planned_irfft = \
            _nd_preparations(x_shape, y_shape, mode, correlation)
        y_rfft = x_rfft
        x_wrapper = _utils.id
        y_wrapper = _utils.id

    else:
        (x_wrapper,
         y_wrapper,
         rev_slices,
         out_slices,
         x_rfft,
         y_rfft,
         planned_irfft) = _1d_perparations(x_shape, y_shape, axis, mode)

    def x_transform_function(x):
        return x_wrapper(x_rfft(x))

    if correlation:
        def y_transform_function(y):
            return y_wrapper(y_rfft(y[rev_slices]))
    else:
        def y_transform_function(y):
            return y_wrapper(y_rfft(y))

    def planned_function(x_fft, y_fft):
        return planned_irfft(x_fft*y_fft)[out_slices]

    return planned_function, x_transform_function, y_transform_function


# ### Public functions

[docs]def firfilter2(filter_coefficients, signal, axis=-1): """Returns a planned FIR-filter function. The planned function is returned along with separate input transform functions. This is intended for more advanced use cases, such as when precise control of transforms is required for memoization or other reasons. For most use cases, `firfilter`, `firfilter_const_filter` or `firfilter_const_signal` are more appropriate. Parameters ---------- filter_coefficients: Union[Array-like, int] An array of filter coefficients or a filter length signal: Union[Array-like, Tuple[int, int], int] A signal array, shape or length axis: int The axis of `signal` along which the filter shall be applied Returns planned_filter(b_transformed, x_transformed) The filtering function b_transform_function(b) The filter transform function x_transform_function(x) The signal transform function See also -------- firfilter, firfilter_const_filter, firfilter_const_signal """ filter_length = _utils.get_shape(filter_coefficients)[0] signal_shape = _utils.get_shape(signal) signal_length = signal_shape[axis] n_dim = len(signal_shape) fft_size = _utils.compute_filter_nfft(filter_length, signal_length) # ### Make pyfftw plans u_shape = tuple(_utils.replace(signal_shape, axis, fft_size)) rfft_function, irfft_function = rfft_pair(tuple(u_shape), fft_size, axis=axis) if n_dim > 1: rfft_function_filter = rfft(fft_size) else: rfft_function_filter = rfft_function segment_length = fft_size - filter_length + 1 padded_signal_shape = tuple(_utils.replace(signal_shape, axis, fft_size)) def filter_transform_function(b): return rfft_function_filter( np.hstack((b, np.zeros(fft_size - filter_length)))) def signal_transform_function(x): if not all(a == b for a, b in zip(x.shape, signal_shape)): raise ValueError("Invalid signal shape {}, function expects {}" .format(x.shape, signal_shape)) slices = [slice(None)] * n_dim segments = [] for k in range(0, signal_length, segment_length): k_to = np.min((k + segment_length, signal_length)) slices_ = tuple(_utils.replace(slices, axis, slice(k, k_to))) segments.append(rfft_function(_utils.pad_array(x[slices_], padded_signal_shape))) return segments def filter_function(transformed_filter, transformed_segments): y = np.zeros(signal_shape) segment_iterator = iter(transformed_segments) slices = [slice(None)]*n_dim for k in range(0, signal_length, segment_length): y_ = irfft_function(np.apply_along_axis( lambda x: x*transformed_filter, axis, next(segment_iterator))) y_to = np.min((signal_length, k + fft_size)) out_slices = tuple(_utils.replace(slices, axis, slice(k, y_to))) part_slices = \ tuple(_utils.replace(slices, axis, slice(0, y_to - k))) y[out_slices] += y_[part_slices].real return y return (filter_function, filter_transform_function, signal_transform_function)
[docs]def firfilter(plan_b, plan_x, axis=-1): """Returns a planned function that filters a signal using the frequency domain overlap-and-add method with pyfftw. Parameters ---------- plan_b : integer, shape or array-like A filter vector or its length or shape plan_x : integer, shape or array-like A signal vector or its length or shape axis : optional[int] Axis in plan_x along which the filter is applied Returns ------- planned_filter(b, x) The filtering function Raises ------ ValueError If filter has more than one dimensions. """ (filter_function, filter_transform_function, signal_transform_function) = firfilter2(plan_b, plan_x, axis=axis) def planned_firfilter(b, x): return filter_function(filter_transform_function(b), signal_transform_function(x)) return planned_firfilter
[docs]def firfilter_const_filter(b, plan_x, axis=-1): """Returns a planned function that filters a signal using the frequency domain overlap-and-add method with pyfftw. Parameters ---------- b : array-like A filter vector plan_x : integer, shape or array-like A signal vector or its length or shape axis : optional[int] Axis in plan_x along which the filter is applied Returns ------- planned_firfilter(x) The planned FIR-filtering function. Raises ------ ValueError If filter has more than one dimensions. """ (filter_function, filter_transform_function, signal_transform_function) = firfilter2(b, plan_x, axis=axis) b_ = filter_transform_function(b) def planned_firfilter(x): return filter_function(b_, signal_transform_function(x)) return planned_firfilter
[docs]def firfilter_const_signal(plan_b, x, axis=-1): """Returns a planned function that filters a signal using the frequency domain overlap-and-add method with pyfftw. Parameters ---------- plan_b : integer, shape or array-like A filter vector or its length or shape x : Array-like A signal vector or array axis : optional[int] Axis in plan_x along which the filter is applied Returns ------- planned_firfilter(b) The planned FIR-filtering function. Raises ------ ValueError If filter has more than one dimensions. """ (filter_function, filter_transform_function, signal_transform_function) = firfilter2(plan_b, x, axis=axis) x_ = signal_transform_function(x) def planned_firfilter(b): return filter_function(filter_transform_function(b), x_) return planned_firfilter
[docs]def convolve2(plan_x, plan_y, mode='full', axis=None): """Plan a function for convolving two arrays. The planned function is returned along with separate input transform functions. This is intended for more advanced use cases, such as when precise control of transforms is required for memoization or other reasons. For most use cases, `convolve`, `convolve_const_x` or `convolve_const_y` are more appropriate. This planner distinguishes between several different cases: 1. 1-D convolution of 1-D sequences or along single axis in N-D arrays 2. N-D convolution of N-D sequences 3. 1-D convolution along single axis in N-D array with 1-D sequence 4. 1-D convolution of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D convolution along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- plan_x : integer, shape or array-like First input sequence, it's shape or length. plan_y : integer, shape or array-like Second input sequence, it's shape or length. mode : Optional[str] A string indicating the output size. 'full' The output is the full discrete linear convolution of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis : Optional[int or sequence of ints] The axis or axes along which the convolution is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_convolve(x_transformed, y_transformed) Planned convolution function. x_transform_function(x) Planned transform function for first sequence y_transform_function(y) Planned transform function for second sequence Raises ------ ValueError If no function can be planned for the given arguments. See also -------- convolve, convolve_const_x, convolve_const_y """ return _convolve_or_correlate(plan_x, plan_y, mode=mode, axis=axis)
[docs]def convolve(plan_x, plan_y, mode='full', axis=None): """Plan a function for convolving two arrays. This planner distinguishes between several different cases: 1. 1-D convolution of 1-D sequences or along single axis in N-D arrays 2. N-D convolution of N-D sequences 3. 1-D convolution along single axis in N-D array with 1-D sequence 4. 1-D convolution of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D convolution along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- plan_x : integer, shape or array-like First input sequence, it's shape or length. plan_y : integer, shape or array-like Second input sequence, it's shape or length. mode : Optional[str] A string indicating the output size. 'full' The output is the full discrete linear convolution of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis : Optional[int or sequence of ints] The axis or axes along which the convolution is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_convolve(x, y) Planned convolution function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ convolve2(plan_x, plan_y, mode=mode, axis=axis) def convolve_function(x, y): return planned_function(x_transform_function(x), y_transform_function(y)) return convolve_function
[docs]def convolve_const_x(x, plan_y, mode='full', axis=None): """Plan a function for convolving a constant array with another array. This planner distinguishes between several different cases: 1. 1-D convolution of 1-D sequences or along single axis in N-D arrays 2. N-D convolution of N-D sequences 3. 1-D convolution along single axis in N-D array with 1-D sequence 4. 1-D convolution of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D convolution along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- x: Array-like First input (constant) plan_y: integer, shape or array-like Second input or it's shape or length. mode: Optional[str] A string indicating the output size. 'full' The output is the full discrete linear convolution of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis: Optional[int or sequence of ints] The axis or axes along which the convolution is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_convolve(y) Planned convolution function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ convolve2(x, plan_y, mode=mode, axis=axis) x_fft = x_transform_function(x) def convolve_function(y): return planned_function(x_fft, y_transform_function(y)) return convolve_function
[docs]def convolve_const_y(plan_x, y, mode='full', axis=None): """Plan a function for convolving an array with another constant array. This planner distinguishes between several different cases: 1. 1-D convolution of 1-D sequences or along single axis in N-D arrays 2. N-D convolution of N-D sequences 3. 1-D convolution along single axis in N-D array with 1-D sequence 4. 1-D convolution of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D convolution along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- plan_x: integer, shape or Array-like First input or it's shape or length. y: Array-like Second input (constant) mode: Optional[str] A string indicating the output size. 'full' The output is the full discrete linear convolution of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis: Optional[int or sequence of ints] The axis or axes along which the convolution is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_convolve(x) Planned convolution function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ convolve2(plan_x, y, mode=mode, axis=axis) y_fft = y_transform_function(y) def convolve_function(x): return planned_function(x_transform_function(x), y_fft) return convolve_function
[docs]def correlate2(plan_x, plan_y, mode='full', axis=None): """Plan a function for correlating two arrays. The planned function is returned along with separate input transform functions. This is intended for more advanced use cases, such as when precise control of transforms is required for memoization or other reasons. For most use cases, `correlate`, `correlate_const_x` or `correlate_const_y` are more appropriate. This planner distinguishes between several different cases: 1. 1-D correlation of 1-D sequences or along single axis in N-D arrays 2. N-D correlation of N-D sequences 3. 1-D correlation along single axis in N-D array with 1-D sequence 4. 1-D correlation of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D correlation along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- plan_x : integer, shape or array-like First input sequence, it's shape or length. plan_y : integer, shape or array-like Second input sequence, it's shape or length. mode : Optional[str] A string indicating the output size. 'full' The output is the full discrete linear correlation of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis : Optional[int or sequence of ints] The axis or axes along which the correlation is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_correlate(x_transformed, y_transformed) Planned correlation function x_transform_function(x) Planned transform function for the first array y_transform_function(y) Planned transform function for the second array Raises ------ ValueError If no function can be planned for the given arguments. """ return _convolve_or_correlate(plan_x, plan_y, mode=mode, axis=axis, correlation=True)
[docs]def correlate(plan_x, plan_y, mode='full', axis=None): """Plan a function for correlating two arrays. This planner distinguishes between several different cases: 1. 1-D correlation of 1-D sequences or along single axis in N-D arrays 2. N-D correlation of N-D sequences 3. 1-D correlation along single axis in N-D array with 1-D sequence 4. 1-D correlation of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D correlation along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- plan_x : integer, shape or array-like First input sequence, it's shape or length. plan_y : integer, shape or array-like Second input sequence, it's shape or length. mode : Optional[str] A string indicating the output size. 'full' The output is the full discrete linear correlation of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis : Optional[int or sequence of ints] The axis or axes along which the correlation is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_correlate(x, y) Planned correlation function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ correlate2(plan_x, plan_y, mode=mode, axis=axis) def correlate_function(x, y): return planned_function(x_transform_function(x), y_transform_function(y)) return correlate_function
[docs]def correlate_const_x(x, plan_y, mode='full', axis=None): """Plan a function for correlating a constant array with another array. This planner distinguishes between several different cases: 1. 1-D correlation of 1-D sequences or along single axis in N-D arrays 2. N-D correlation of N-D sequences 3. 1-D correlation along single axis in N-D array with 1-D sequence 4. 1-D correlation of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D correlation along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- x: Array-like First input (constant) plan_y: integer, shape or array-like Second input or it's shape or length. mode: Optional[str] A string indicating the output size. 'full' The output is the full discrete linear correlation of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis: Optional[int or sequence of ints] The axis or axes along which the correlation is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_correlate(y) Planned correlation function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ correlate2(x, plan_y, mode=mode, axis=axis) x_fft = x_transform_function(x) def correlate_function(y): return planned_function(x_fft, y_transform_function(y)) return correlate_function
[docs]def correlate_const_y(plan_x, y, mode='full', axis=None): """Plan a function for correlating an array with another constant array. This planner distinguishes between several different cases: 1. 1-D correlation of 1-D sequences or along single axis in N-D arrays 2. N-D correlation of N-D sequences 3. 1-D correlation along single axis in N-D array with 1-D sequence 4. 1-D correlation of 1-D sequence along single axis in N-D array The shapes of the two inputs are usually sufficient to determine which case should be used. However, to get 1-D correlation along single axis in N-D arrays, 'axes' must be a single integer or a length 1 sequence. Parameters ---------- x: Array-like First input (constant) plan_y: integer, shape or array-like Second input or it's shape or length. mode: Optional[str] A string indicating the output size. 'full' The output is the full discrete linear correlation of the inputs. (Default) 'valid' The output consists only of those elements that do not rely on zero-padding. 'same' The output is the same size as plan_x, centered with respect to the 'full' output. axis: Optional[int or sequence of ints] The axis or axes along which the correlation is computed. Default is the last N axes of the highest-rank input where N is the rank of the lowest-rank input. Returns ------- planned_correlate(x) Planned correlation function. Raises ------ ValueError If no function can be planned for the given arguments. """ planned_function, x_transform_function, y_transform_function = \ correlate2(plan_x, y, mode=mode, axis=axis) y_fft = y_transform_function(y) def correlate_function(x): return planned_function(x_transform_function(x), y_fft) return correlate_function