Summary Statistics

Real-time applications frequently need to produce summaries of ingested data. An especially common requirement is producing multiple reports, one per entity within a system being monitored.

This guide illustrates how to solve problems of this nature by using Swim to:

Representation

Suppose we wish to monitor the health of multiple cell towers that continuously post their current state. Each update might look like (JSON):

{
  "tower_id": (unique identifier)
  "timestamp": (epoch in ms),
  "s_n_ratio": (signal-noise ratio),
  "disconnects": (failures since previous posted update)
  ... (possibly more fields)
}

Each tower requires a corresponding Web Agent that can receive and process these updates. A skeletal AbstractTowerAgent class could look like:

// AbstractTowerAgent.java
import swim.api.SwimLane;
import swim.api.agent.AbstractAgent;
import swim.api.lane.CommandLane;
import swim.structure.Value;

public abstract class AbstractTowerAgent extends AbstractAgent {

  @SwimLane("addMessage")
  CommandLane<Value> addMessage = this.<Value>commandLane()
      .onCommand(v -> updateSummary(messageTimestamp(v), v));

  protected long messageTimestamp(Value v) {
    return v.get("timestamp").longValue();
  }

  protected abstract void updateSummary(long timestamp, Value newValue);

}

Stream-Optimized Algorithms

Efficiency is critical if we call updateSummary() against every incoming message, especially if message volumes are high. Running offline algorithms against a time series that contains every received message can compute any (solvable) statistic of interest, but this approach incurs linearly increasing memory and time costs.

Suppose that each TowerAgent must report:

We can accomplish this without ever reading a time series. Upon receiving a status update:

For cleanliness, let’s wrap functionality in a separate class:

// TowerSummaryState.java
import swim.structure.Record;
import swim.structure.Value;

class TowerSummaryState {

  // sinr
  private double min = Double.MAX_VALUE;
  private double max = -Double.MIN_VALUE;
  private int count = 0;
  private double mean = 0.0;
  private double agg = 0.0;
  // failures
  private int failures = 0;

  public void addValue(double d, int f) {
    this.min = Math.min(this.min, d);
    this.max = Math.max(this.max, d);
    this.count += 1;
    // Welford's online algorithm for computing variance also handles mean.
    // The mean part may seem cryptic. Start from the fact that:
    //   newMean = (oldMean * (newCount - 1) + d) / newCount
    // Now let delta = d - oldMean, and these two lines should make sense.
    final double delta = d - this.mean;
    this.mean += delta / this.count;
    // delta is required to update Welford's algorithm's critical accumulator.
    this.agg += delta * (d - this.mean);

    this.failures += f;
  }

  public Value getSummary() {
    if (this.count == 0) {
      return Value.extant();
    }
    return Record.create(6)
        .slot("count", this.count)
        .slot("min", this.min)
        .slot("max", this.max)
        .slot("avg", this.mean)
        .slot("variance", this.agg / this.count)
        .slot("failures", this.failures);
    // Note: agg and count trivially transform into more than just variance:
    // - agg / count = variance
    // - sqrt(agg/count) = stdev
    // - agg / (count - 1) = sample variance
    // - sqrt(agg / (count - 1)) = sample stdev
  }

}

Note: The average could just as effectively have been calculated with a rolling sum divided by the count, as suggested earlier. We only chose this approach because we require delta to calculate variance anyway.

Note: Not every statistic can be optimized this perfectly. For example, any (non-heuristic) median over floating points will require reading past values at some point, leading to O(n) space complexity (choice of representation may still reduce time complexity).

This yields a very compact TowerAgent implementation:

// TowerAgent.java
import swim.api.lane.ValueLane;
import swim.recon.Recon;
import swim.structure.Value;

public class TowerAgent extends AbstractTowerAgent {

  private TowerSummaryState state;

  @SwimLane("summary")
  ValueLane<Value> summary = this.<Value>valueLane()
      .didSet((n, o) ->
          System.out.println(nodeUri() + ": updated summary to " + Recon.toString(n)));

  @Override
  protected void updateSummary(long timestamp, Value v) {
    this.state.addValue(v.get("s_n_ratio").doubleValue(),
        v.get("disconnects").intValue());
    this.summary.set(this.state.getSummary());
  }

  @Override
  public void didStart() {
    this.state = new TowerSummaryState();
  }

}

Note: While we discourage reliance on it for computations, we could easily attach a time series of message history to TowerAgent (or its variations described in the upcoming sections). Doing so instantiates two sets of streaming APIs: one for comprehensive statistics, and one for individual events. This may be a valuable addition, especially when combined with a retention policy that retains only recent or interesting historical events.

Bucketed Summaries

Currently, each TowerAgent computes a single summary over its entire lifetime. We can instead choose to track muliple summaries, each representing some subset of the entity’s data. One common motivator for this is a need for time-based bucketing, i.e. separate summaries that each cover a non-overlapping time period (such as every 15 minutes).

To support this functionality, simply update TowerAgent to 1) maintain a Map of TowerSummaryStates (as opposed to a single TowerSummaryState) 2) publish summaries to a MapLane instead of a ValueLane:

// BucketedTowerAgent.java
import java.util.HashMap;
import java.util.Map;
import swim.api.lane.MapLane;

public class BucketedTowerAgent extends AbstractTowerAgent {

  // one minute period so you can quickly test this yourself
  private static final long SAMPLE_PERIOD_MS = 60000L;

  // Each key to this HashMap is the top of the minute covered by the value.
  // For example the TowerSummaryState for the left-inclusive time period
  //   [2023-08-21 17:09:00, 2023-08-21 17:10:00) (in UTC)
  // will be under the key 1692637740000L.
  // Note that you could choose to expose this as a MapLane, too.
  private Map<Long, TowerSummaryState> summaryStates;

  // Same keys as the summaryStates map
  @SwimLane("summaries")
  MapLane<Long, Value> summaries = this.<Long, Value>mapLane()
      .didUpdate((k, n, o) ->
          System.out.println(nodeUri() + ": updated summary under " + k + " to " + Recon.toString(n)));

  @Override
  protected void updateSummary(long timestamp, Value v) {
    final long key = bucket(timestamp);
    final TowerSummaryState state = this.summaryStates.getOrDefault(key, new TowerSummaryState());
    state.addValue(v.get("s_n_ratio").doubleValue(),
        v.get("disconnects").intValue());
    this.summaries.put(key, state.getSummary());
    this.summaryStates.put(key, state);
  }

  private static long bucket(long timestamp) {
    // Floor div then multiplication quickly purges non-significant digits.
    // For example:
    //   2023-08-21 17:09:34 = (in epoch milliseconds) 1692637774000
    //   1692637774000 / 60000 = 28210629
    //   28210629 * 60000 = 1692637740000
    // which matches the value in the summaryStates field's comment.
    // This strategy may not work as expected for awkward SAMPLE_PERIOD values;
    // adjust as needed for your use case.
    return timestamp / SAMPLE_PERIOD_MS * SAMPLE_PERIOD_MS;
  }

  @Override
  public void didStart() {
    if (this.summaryStates != null) {
      this.summaryStates.clear();
    }
    this.summaryStates = new HashMap<>();
  }

}

Windowed Summaries

The logic in the previous section works even if messages come out of order, and if we receive a message whose timestamp is in the future. However, it requires maintaining multiple independent summary states at a time, since a new incoming message could target any one of these.

An application that uses system timestamps instead of message timestamps never encounters these situations. Consequently, it only needs one summary state accumulator; just make sure to reset it if time has elapsed into a new bucket.

// WindowedTowerAgent.java
public class WindowedTowerAgent extends AbstractTowerAgent {

  private static final long SAMPLE_PERIOD_MS = 60000L;

  private TowerSummaryState currentState;
  private long currentBucket;

  @SwimLane("summaries")
  MapLane<Long, Value> summaries = this.<Long, Value>mapLane()
      .didUpdate((k, n, o) ->
          System.out.println(nodeUri() + ": updated summary under " + k + " to " + Recon.toString(n)));

  @Override
  protected long messageTimestamp(Value v) {
    // Pretend v lacks time information, so our best option is system time
    return System.currentTimeMillis();
  }

  @Override
  protected void updateSummary(long timestamp, Value v) {
    final long key = bucket(timestamp);
    if (key != this.currentBucket) {
      // Time has passed into a new bucket, reset accumulations
      resetState(timestamp);
    }
    this.currentState.addValue(v.get("s_n_ratio").doubleValue(),
        v.get("disconnects").intValue());
    this.summaries.put(key, this.currentState.getSummary());
  }

  private void resetState(long now) {
    this.currentState = new TowerSummaryState();
    this.currentBucket = bucket(now);
  }

  private static long bucket(long timestamp) {
    return timestamp / SAMPLE_PERIOD_MS * SAMPLE_PERIOD_MS;
  }

  @Override
  public void didStart() {
    resetState(System.currentTimeMillis());
  }

}

Standalone Project

We encourage you to experiment with the standalone project that collects the information and code samples presented here. A few things to note:

The following swim-cli commands are available while the process runs (replace $ID instances with either 2350 or 2171):