Digital filters for live signal processing in Python
Digital filters are commonplace in biosignal processing. And the SciPy library offers a strong digital signal processing (DSP) ecosystem that is exceptionally well documented and easy to use with offline data. However, there is shockingly little material online on DSP in Python for real-time applications. In a live graphical interface (like yarppg), the signal needs to be processed while it is being generated - one sample at a time.
In this post, I am showing two different implementations of digital filters, that can be used in a real-time setting. DSP is a vast topic - the theory behind it involves many topics that I only understand superficially: trigonometry, linear time-invariant (LTI) systems, transformations to the frequency spectrum domain, etc… I will not try to explain these concepts but rather focus on the application in Python. A great resource to learn more about DSP is dsprelated.com.
This post covers the following topics:
- live implementation of digital IIR filters equivalent to SciPy’s
lfilter
- adding live filters to the code from the earlier post on yarppg
- brief intro to second-order sections (SOS filters)
- live implementation of digital IIR filters equivalent to SciPy’s
sosfilter
Live filter implementations in Python
I covered some of the basics on digital filters in my previous post, where I showed an example application of an infinite impulse response (IIR) filter from the SciPy library. Filter design and application is conveniently achieved in only two lines of code:
|
|
In a real-time setting, the incoming signal needs to be processed one sample at a time. So we want a representation of the filter where the following code is equivalent to the scipy filter functions:
|
|
Base class for live filter implementations
As I am going to show two different implementations, I am defining a base class, that takes over some of the boilerplate. Arguably, it is not that much anyway, but this approach should improve extendibility:
|
|
By implementing the __call__
method, we can have the filter behave like a
function while still being able to track the filter state as instance variables.
Difference equation implementation: lfilter
-equivalent
A digital filter can be described using a difference equation. It looks like
the following and defines the new output value $y[n]$ as a weighted sum of past
inputs $x[n-k]$ and outputs $y[n-k]$:
$$
a_0 y[n] = b_0 x[n] + b_1 x[n-1] + b_2 x[n-2] - a_1 y[n-1] - a_2 y[n-2]
$$
Once we have obtained the coefficients $b_k$ and $a_k$ from a filter design
function (like scipy.signal.iirfilter
), we can use the above equation to
implement the filter. The filter class (LiveLFilter
) needs to keep track of
the most recent inputs and outputs to calculate the new output.
The constructor looks as follows:
|
|
Since we only need to track a moving window of input and output values, it
is convenient to use a double-ended queue. The Python standard
library provides an implementation of this data structure
(collections.deque
). The queues holding past x and y values
are restricted to only hold as many values as needed in the difference equation.
Then, whenever a value is pushed to the full queue, the oldest value is dropped.
In order to allow any filter order, and thus any length of coefficient vectors,
I am using numpy functions for multiplications and additions.
The difference equation can be rewritten as the difference of two dot products,
all that is left to do is to append x and y values to the correct side of the
queue.
Note that by appending values from the left, the coefficients $b_k$ and $a_k$
are properly aligned with the past inputs $x[n-k]$ and outputs $y[n-k]$.
If we used the more common append
method, we would need to reverse one of the
vectors inside the dot products.
|
|
Also note that filter coefficients are typically scaled so that $a_0=1$. Line 6 above could be omitted, but I want the function to allow any coefficients.
LiveLFilter
vs. SciPy’s lfilter
This was already the entire code for the LiveLFilter
class.
To see if the filter implementation works as expected, we can use the example
signal from the last post.
It is a 1 Hz sine wave overlaid with Gaussian noise.
|
|
Now, we can define and apply the IIR filter using with both the live implementation and the SciPy version. I am using the mean absolute error (MAE) metric from scikit-learn to quantify the difference.
|
|
lfilter error: 1.8117e-15
The MAE is very close to zero and there is no visual difference between the outputs of both filters:
|
|
Application of live filters in yarppg
In my post on heartbeat signal extraction, I recreated a
minimalistic version of yarppg, that obtained a rather noisy pulse
signal from the webcam input.
We can leverage the LiveLFilter
implementation from above to process the raw
pulse signal, reducing noise and removing lower-frequency drifts in the signal.
The filter type to remove both lower and higher frequencies at the same time is
called band-pass. It passes a specified frequency band while everything else
is rejected.
Extension of previous code
First, let’s adjust the RPPG
class to actually perform the filtering.
I add a filter_function
argument defaulting to None in the constructor.
If nothing is passed here, a do-nothing lambda function is created:
|
|
The provided filter function is then simply applied during on_frame_received
.
There is only one more line to adjust in RPPG
:
|
|
Now, all that is left is to define the band-pass filter and pass it to RPPG
in the main function. For convenient creation of the live filter, a helper
function is added to rppg.py
:
|
|
The LiveSosFilter
implementation, using second-order sections is discussed
in a later section.
Final application
The new main function, initializing the components and starting the user interface, only needs two additional lines:
|
|
Using the filter settings above, the extracted pulse signal is nicely smoothed and heart beats are more clearly visible. In the next post, we can tackle finding peaks in the pulse signal and computing an average heart rate.
Improving stability with second-order sections
According to the SciPy documentation, the function
scipy.signal.sosfilt
should be preferred over lfilter
in pretty much all filtering tasks.
This is because, in their normal form, IIR filters are susceptible to
coefficient quantization errors and digital implementations can run into
stability problems. 1
Second-order sections filter (biquads)
To combat the numerical problems of higher-order filters, an IIR filter can be broken down into a cascade of so-called second-order sections. Second-order sections (or biquads) can be designed in several different structures2. The SciPy library works with the transposed direct form II shown below. The $z^{-1}$ blocks denote a one-sample delay.
The signal flow chart can be read like this: The biquad output $y[n]$ is
calculated by adding $x[n]\cdot b_0$ and $w[n-1]$. Where $w[n-1]$ is the
result of the summation in the center one sample ago (delayed in the $z^{-1}$
block).
Any higher-order filter can be rewritten as a series of such biquads.
As a user, we really do not need to care all that much about the mathematics
behind this. A filter designed as cascaded biquads can be obtained with
scipy.signal.iirfiler
by setting the argument output="sos"
.
Live implementation of sosfilt
I am creating a second class derived from LiveFilter
. In the constructor,
we need to store the biquad coefficients and zero-initialize the state.
Additionally, we get the number of filter sections from the coefficients' shape:
|
|
It took me a while to get the code below working, after digging through the SciPy code and documentation as well as some other resources mentioned before (1, 2, 3 and 4). In essence, it’s actually less than six lines of code, but they are quite dense.
|
|
The for-loop iterates through all filter sections and computes the corresponding
section output $y$.
To make the code more readable and concise, the section coefficients are
extracted into separate variables. Following the signal flow chart from above,
the output $y$ is calculated as the sum of the current input $x$ (times $b_0$)
plus the previous state of the first delay block. The first delay block in turn
is given as the sum of three terms: $b_1 x$, $-a_1 y$ and the state of the
second delay. Finally, the second delay block receives $b_2 x - a_2 y$.
Note that delaying the signals occurs implicitly, as the previous values of
self.state
are accessed before they are computed for the current step.
Before moving on to the computation of the next filter section, $x$ is
overwritten by the biquad output. This imitates the next biquad being connected
directly to the previous output.
LiveSosFilter
vs. SciPy’s sosfilt
To make sure that the implementation works as intended, we can compare SciPy’s
sosfilt
output to the LiveSosFilter
. Using the same example signal as
above, we can simulate the
live setting and compute the MAE.
|
|
sosfilter error: 0
The output of the live implementation exactly matches SciPy’s implementation.
In order to use LiveSosFilter
instead of LiveLFilter
in the live GUI, we
only need to
replace the output argument in get_heartbeat_filter
to "sos"
. Although
arguably in this specific application, there should not be a notable difference.
|
|
Here is a link to download the entire code for this article with DownGit.
(References)
-
R. G. Lyons, “Infinite Impulse Response Filters,” Understanding Digital Signal Processing, 3rd ed. Pearson, pp. 253-360, 2011. ↩︎
-
https://ccrma.stanford.edu/~jos/filters/Direct_Form_II.html ↩︎