Hazard

Signal Processing with Stateful Transducers January 17, 2017

I finally sat down a couple months ago to wrap my mind about transducers. Reading about transducers never gave me an intuitive feel for them, so this summer when I had a signal processing problem, it gave me a great opportunity to play with and really understand how to use and compose transducers.

The transducers for map, filter, and keep are stateless transducers; the returned reducer functions don’t keep any state between calls. Several other transducers are stateful transducers, including take, drop, map-indexed, keep-indexed, and distinct. Studying the source of transducers in clojure.core is a great guide to writing more. The transducers below were originally based off the pattern seen in distinct.

To get the derivative of a series of values without a transducer, you could do this:


(defn derivative [xs]
  (map (fn [[a b]]
         (- b a))
       (partition 2 1 xs)))

(derivative [2 1 6 9 8])

The transducer version of derivative is more complicated, but illustrates a typical shape for a transducer:


(defn derivative [rf]
  (let [previous (atom nil)]
    (fn
      ([] (rf))
      ([result] (rf result))
      ([result item]
        (let [prev @previous]
          (reset! previous item)
          (if (nil? prev)
            result
            (rf result (- item prev))))))))

(sequence derivative [2 1 6 9 8])

The reducing function closes over an atom that keeps its state, in this case the last value it’s seen. Each time the reducer is called, it saves the value and sends to the next reducer the difference between the current value and the previous value. On the first value, derivative doesn’t pass any value on to the next reducer.

The transducer version of derivative is more complicated, but not without advantages. The most common argument for transducers is that it avoids intermediate sequences, such as partition creates in the non-transducer version. It uses less memory and runs as fast as function application.

Transducers tend to have a common shape. Here’s a transducer for applying a function to a sliding window of values. It has a similar structure to the derivative transducer:


(defn roll [n xs x]
  (if (< (count xs) n)
    (conj xs x)
    (conj (subvec xs 1) x)))

(defn sliding-window [n f]
  (fn [rf]
    (let [window (atom [])]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result item]
          (swap! window #(roll n % item))
          (if (< (count @window) n)
            result
            (rf result (f @window))))))))

(defn avg [xs]
  (/ (reduce + xs) (count xs)))

(sequence (sliding-window 2 avg)
          [2 1 6 9 8])

sliding-window is useful for taking moving averages, weighted moving averages, averages without outlying data points, exponential smoothing, and more.

You can use transducer state to find points of interest in the incoming values, such as emitting only local maxima:


(defn maxima [rf]
  (let [peak (atom nil)]
    (fn
      ([] (rf))
      ([result] (rf (rf result @peak)))
      ([result item]
        (let [p @peak]
          (reset! peak item)
          (if (> p item)
            (rf result p)
            result))))))

(sequence maxima [5 1 6 3 4 5 7 4 8])

With more stateful complexity we can track periodic cycles and peaks. The following transducer resets whenever the value drops below a given value:


(defn peaks [cutoff]
  (fn [rf]
    (let [prev (atom nil)
          peak (atom nil)]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result item]
          (let [p @peak]
            (cond
              (nil? prev)
              (do
                (reset! prev item)
                result)

              (> @prev cutoff item)
              (let [p @peak]
                (reset! peak item)
                (reset! prev item)
                (rf result p))

              (< @peak item)
              (do
                (reset! peak item)
                (reset! prev item)
                result)

              :else
              (do (reset! prev item) result))))))))

(sequence (peaks 0) [5 1 6 3 -1 -3 -6 -2 3 4 9 2 1 -6])

Stateful transducers tend to feel very imperative, but like a good functional programming construct, they neatly box the imperative logic into a function that doesn’t have to worry about state.


(sequence
  (comp
    derivative
    (sliding-window 2 avg)
    maxima)
  [5 1 6 3 -1 -3 -6 -2 3 4 9 2 1 -6])

Rarely will you find imperative constructs that compose so cleanly. As I was working with a noisy, periodic signal, it was simple to tinker with the sequence of signal filters trying to suss out the data I was looking for, inserting transducers and commenting out transducers, plotting the results to see how well it matched expectations. Try updating the transducers in the above call to sequence to see how the data transforms step by step.

The transducers I’ve given here work for single-value data points, not coordinate pairs, though it’s easy to modify them to handle x-values while doing work on the y-values.

A good exercise in writing stateful transducers is to implement each Nelson rule as a transducer, emitting only those points that match the rule.

The above transducers store state with atoms, but as Alex Miller notes in this post, reducers are only called in a single thread at a time, so it’s safe to use volatile! or transient in place of atom.

Christophe Grand has some more sophisticated transducers in his xforms library.

If you have any interesting transducers of your own, I’d love to hear about them. You can find me on Twitter and GitHub.