Group and Aggregate Agents

It is common to want to group agents together and calculate some aggregate stats across that set of agents.

This pattern can really be extended to any kind of grouping where the members share some property, but some specific examples include:

Throughout this guide, for demonstration, we are going to build out the vehicle example, grouping the vehicles by the state (in the USA) they are currently in.

Let’s take a simple vehicle agent as a starting point, the main component of which is a ValueLane that stores the vehicle’s status.

public class VehicleAgent extends AbstractAgent {

  @SwimLane("addEvent")
  public CommandLane<Value> addEvent = this.<Value>commandLane()
      .onCommand(v -> this.status.set(v));

  @SwimLane("status")
  public ValueLane<Value> status = this.<Value>valueLane();

}

In this example the status of a vehicle is stored as a Swim Value but this can be changed to any supported type.

Group Agent

First we are going to construct a simple group/aggregate agent, in our example a StateAgent.

A JoinValueLane inside the StateAgent will become a map, the key being some unique identifier of a vehicle, and the value being the vehicle’s current status. The lane will keep itself up to date with the status of the vehicle through the use of downlinks.

public class StateAgent extends AbstractAgent {

  @SwimLane("vehicles")
  public JoinValueLane<Uri, Value> vehicles = this.<Uri, Value>joinValueLane();

}

Notice the type parameters of the lane in the above example.

Now we need some way for a vehicle to register itself with the state and join the lane. To do this we open a CommandLane named addVehicle accepting a Uri - which will be the vehicle’s node URI. The downlink(K key) method on the join value lane creates a downlink with the given unique key. We also specify the nodeUri and laneUri then start the downlink with open().

  @SwimLane("addVehicle")
  public CommandLane<Uri> addVehicle = this.<Uri>commandLane()
      .onCommand(v ->
          this.vehicles
              .downlink(v)
              .nodeUri(v)
              .laneUri("status")
              .open());

We now have a working StateAgent that can accept a vehicle’s node URI and will keep an up to date map of all registered vehicle’s status lanes.

See this in an agent in the example.

Joining

Next we need to add some logic to the VehicleAgent to command the StateAgent’s addVehicle lane we created in the previous section.

Assuming the URI pattern for state agents is /state/:name then we can create a joinState method in the VehicleAgent. The method will accept a state’s name and then command the state’s addVehicle lane with the vehicle’s node URI.

  private void joinState(final String state) {
    command(
        "/state/" + state,
        "addVehicle",
        Uri.form().mold(nodeUri()).toValue()
    );
  }

Note: We have to convert the vehicle’s node URI to a Value to send it via a command. For more information on this see Forms.

We now need to call the above method, exactly when you will want to do this will depend on the use case. The most common cases are:

Both approaches have a similar implementation, but we will show both.

Agent Start

We simply call joinState in the agent’s didStart() lifecycle callback.

  @Override
  public void didStart() {
    joinState("California");
  }

This will join the /state/California StateAgent on start up and any updates to the vehicle’s status lane will be reflected in the vehicles lane of the California state agent.

See this in an agent in the example.

On Event

We add a didSet callback to the status lane of the VehicleAgent, extracting the state field from the new status and passing it to the joinState method - if the state field is not present then pass null.

  @SwimLane("status")
  public ValueLane<Value> status = this.<Value>valueLane()
      .didSet((nv, ov) -> joinState(nv.get("state").stringValue(null)));

This introduces a problem; whenever the vehicle status is updated, the vehicle will try to join the state, even if it already has. Ideally we only want the vehicle to try to join if the state field in the status lane changes. We do this by keeping track of the state agent that the vehicle is currently joined and modifying joinState to only command if the state has changed.

  private String currentState;

  private void joinState(final String state) {
    if (isSameAsCurrentState(state)) {
      // If the new state is the same as the current state, then do nothing
      return;
    }
    if (state != null) {
      command(
          "/state/" + state,
          "addVehicle",
          Uri.form().mold(nodeUri()).toValue()
      );
    }
    this.currentState = state;
  }

  private boolean isSameAsCurrentState(final String state) {
    return (this.currentState == null && state == null) ||
        (this.currentState != null && this.currentState.equals(state));
  }

We now have a VehicleAgent that will join any StateAgent when the vehicle moves into it.

Note: You may have noticed that if the vehicle moves between states, then it will be joined to multiple StateAgents - we will address this in the next section.

See this in an agent in the example.

Leaving

Currently, vehicles join a new StateAgent whenever their status changes state, however, they do not leave the old one. We can fix this by implementing a way for the vehicles to leave or deregister from a state.

Much like the addVehicle lane on the StateAgent, we can add a removeVehicle lane. This, when commanded with a vehicle’s node URI, will remove it from the join lane and stop the downlink associated with it.

  @SwimLane("removeVehicle")
  public CommandLane<Uri> removeVehicle = this.<Uri>commandLane()
      .onCommand(v ->
          this.vehicles
              .remove(v));

Next, in the VehicleAgent we can command this lane right before we join a new state (in the joinState method).

  private void joinState(final String state) {
    if (isSameAsCurrentState(state)) {
      // If the new state is the same as the current state, then do nothing
      return;
    }

    if (this.currentState != null) {
      command("/state/" + this.currentState,
          "removeVehicle",
          Uri.form().mold(nodeUri()).toValue()
      );
    }

    if (state != null) {
      command(
          "/state/" + state,
          "addVehicle",
          Uri.form().mold(nodeUri()).toValue()
      );
    }

    this.currentState = state;
  }

We now have a VehicleAgent that can move between StateAgents whenever the state field in the status lane changes.

See this in an agent in the example.

Aggregating

Now we have an automatically updating, streaming map, we can continuously compute some metrics across the whole group. We can aggregate the status of all the vehicle agents into a status of the whole state.

Working entirely inside the StateAgent from now on, we add a few things:

  @SwimLane("vehicles")
  public JoinValueLane<Uri, Value> vehicles = this.<Uri, Value>joinValueLane()
      .didUpdate((k, nv, ov) -> computeStatus());

  @SwimLane("status")
  public ValueLane<Value> status = this.<Value>valueLane();

  private void computeStatus() {
    /// Compute and set the state's status
  }

Whenever a vehicle’s status changes, the status of the state will be recomputed.

All that’s left to do now is decide on the content of the state’s status. There are a various computations we can do to create an aggregated status, but we will show a few of the most common.

Each one is in the example.

Count

Getting a count of all vehicles in the state is the same as getting the size of the join value lane which can be done with the size() method on the lane. For any other count, for example, a count of vehicles currently moving, we can loop through the map, keeping a counter of all vehicles that meet this condition.

  private void computeStatus() {

    int movingVehicles = 0;
    for (final Uri vehicleUri : this.vehicles.keySet()) {
      if (this.vehicles.get(vehicleUri).get("isMoving").booleanValue(false)) {
        movingVehicles++;
      }
    }

    this.status.set(
        Record.create(2)
            .slot("vehicle_count", this.vehicles.size())
            .slot("moving_vehicle_count", movingVehicles)
    );
  }

Mean

Calculating a mean is done by looping through all the entries of the join value lane, keeping a total and dividing by the count at the end.

  private void computeStatus() {

    int totalSpeed = 0, movingVehicles = 0;

    for (final Uri vehicleUri : this.vehicles.keySet()) {
      totalSpeed += this.vehicles.get(vehicleUri).get("speed").intValue(0);

      if (this.vehicles.get(vehicleUri).get("isMoving").booleanValue(false)) {
        movingVehicles++;
      }
    }

    this.status.set(
        Record.create(2)
          .slot("mean_speed", this.vehicles.size() == 0 ? 0 : totalSpeed / this.vehicles.size())
          .slot("mean_speed_of_moving_vehicles", movingVehicles == 0 ? 0 : totalSpeed / movingVehicles)
    );
  }

Here we calculate the mean_speed across all vehicles and the mean_speed_of_moving_vehicles across a subset of the vehicles.

Max/Min

Finding the max or min of some value across all the entries in the join lane can be done by looping through the entries and keeping track of the max/min.

  private void computeStatus() {

    int topSpeed = 0;
    for (final Uri vehicleUri : this.vehicles.keySet()) {
      int currentVehiclesSpeed = this.vehicles.get(vehicleUri).get("speed").intValue(0);
      if (currentVehiclesSpeed > topSpeed) {
        topSpeed = currentVehiclesSpeed;
      }
    }

    this.status.set(
        Record.create(1)
          .slot("top_speed", topSpeed)
    );
  }