Group and Aggregate Agents

Overview

It is common to group a family of agents together in order to perform some data aggregation and this is possible using SwimOS’s Join Value and Join Map lanes. Using these lanes it is possible to combine a set of homogenous data into a single lane and for every update the lane receives, execute some data aggregation logic. This guide will walk through aggregating agents and calculating statistics at different levels.

This functionality may be required in situations such as:

This guide will cover tracking cars throughout a city and focus on the implementation details of the agent design and the lifecycle event handlers and not the setup of generating the data; the complete code is available here.

Specifications

We’ll be building a Swim server application that details a number of data points:

All of the data across the city will be generated and we will just simulate some data across a range as it is a byproduct of what we are demonstrating here, aggregations.

Implementation

Each car is assigned to a random area when the CarAgent first starts using the rand crate. This is modelled as follows:

use std::str::FromStr;
use rand::prelude::SliceRandom;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Form)]
pub enum Area {
    Arbury,
    CherryHinton,
    KingsHedges,
    Petersfield,
}

impl Area {
    pub fn universe() -> [Area; 4] {
        [
            Area::Arbury,
            Area::CherryHinton,
            Area::KingsHedges,
            Area::Petersfield,
        ]
    }
}

impl Default for Area {
    fn default() -> Self {
        Area::select_random()
    }
}

impl FromStr for Area {
    type Err = ();

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "arbury" => Ok(Area::Arbury),
            "cherryhinton" => Ok(Area::CherryHinton),
            "kingshedges" => Ok(Area::KingsHedges),
            "petersfield" => Ok(Area::Petersfield),
            _ => Err(()),
        }
    }
}

impl Display for Area {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            Area::Arbury => write!(f, "arbury"),
            Area::CherryHinton => write!(f, "cherryhinton"),
            Area::KingsHedges => write!(f, "kingshedges"),
            Area::Petersfield => write!(f, "petersfield"),
        }
    }
}

impl Area {
    pub fn select_random() -> Self {
        [
            Area::Arbury,
            Area::CherryHinton,
            Area::KingsHedges,
            Area::Petersfield,
        ]
        .choose(&mut rand::thread_rng())
        .copied()
        .expect("Slice was not empty")
    }
}

The following sections will walk through defining the agents from the bottom and working their way up to the top-level aggregation for the whole city.

Car Agent

The CarAgent will track a vehicles current speed using a Value Lane and register with its assigned area to provide it with updates so that the AreaAgent can aggregate the average speed for the area. The CarAgent uses a single Value Lane to store the car’s current speed and a Value Store for storing the car’s assigned area. The CarAgent is defined as follows:

use std::sync::{Arc, Mutex};
use swimos::agent::{AgentLaneModel, projections};
use swimos::agent::lanes::ValueLane;
use swimos::agent::stores::ValueStore;
use crate::area::Area;

#[projections]
#[derive(AgentLaneModel)]
pub struct CarAgent {
    speed: ValueLane<u64>,
    #[item(transient)]
    area: ValueStore<Area>,
}

#[derive(Debug, Clone)]
pub struct CarLifecycle;

Using the AgentContext, a sequence of handlers may be executed with a fixed delay between them using the schedule_repeatedly function; henceforth referred to as ‘tasks’. Once the agent has started, two tasks will be scheduled using schedule_repeatedly to manage its state:

Both of these tasks can be scheduled in the agent’s on_start lifecycle event handler and no other handlers will be required for the agent.

Speed Handler

The current speed handler will simply generate the random speed within a range and update the corresponding Value Lane with the generated value:

use rand::Rng;
use std::time::Duration;

let speed_handler = context.schedule_repeatedly(Duration::from_secs(5), move || {
    let mut rng = rand::rngs::OsRng;
    Some(context.set_value(CarAgent::SPEED, rng.gen_range(10..=70)))
});

Area Handler

The area handler works in a similar way and randomly generates a new area for the agent and if it is not equal to the current one it will update the registrations; the command type is an Action that is covered later in this guide.

let area_handler = move |car_id: u64| {
    context.schedule_repeatedly(Duration::from_secs(5), move || {
        Some(context.get_value(CarAgent::AREA).and_then(move |old_area| {
            let new_area = Area::select_random();
            let handler = if old_area != new_area {
                // deregister this car with its current area
                let deregister_handler = context.send_command(
                    None,
                    format!("/area/{old_area}"),
                    "registrations".to_string(),
                    Action::Deregister(car_id),
                );
                // register this car with its new assigned area
                let register_handler = context.send_command(
                    None,
                    format!("/area/{new_area}"),
                    "registrations".to_string(),
                    Action::Register(car_id),
                );

                let handler = deregister_handler
                    .followed_by(register_handler)
                    .followed_by(context.set_value(CarAgent::AREA, new_area));
                Some(handler)
            } else {
                // noop handler as the car didn't switch area
                None
            };

            handler.discard()
        }))
    })
};

Complete Lifecycle

Putting it all together we get the following lifecycle for the CarAgent:

use std::mem::replace;
use std::str::FromStr;
use std::time::Duration;
use rand::Rng;
use swimos::{
    agent::agent_lifecycle::utility::HandlerContext,
    agent::event_handler::{EventHandler, HandlerActionExt},
    agent::lifecycle
};
use crate::{
    area::Area,
    car::{CarAgent, CarLifecycle}
};

#[lifecycle(CarAgent)]
impl CarLifecycle {
    #[on_start]
    pub fn on_start(&self, context: HandlerContext<CarAgent>) -> impl EventHandler<CarAgent> {
        let speed_handler = context.schedule_repeatedly(Duration::from_secs(5), move || {
            let mut rng = rand::rngs::OsRng;
            Some(context.set_value(CarAgent::SPEED, rng.gen_range(10..=70)))
        });
        let area_handler = move |car_id: u64| {
            context.schedule_repeatedly(Duration::from_secs(5), move || {
                Some(context.get_value(CarAgent::AREA).and_then(move |old_area| {
                    let new_area = Area::select_random();
                    let handler = if old_area != new_area {
                        // deregister this car with its current area
                        let deregister_handler = context.send_command(
                            None,
                            format!("/area/{old_area}"),
                            "registrations".to_string(),
                            Action::Deregister(car_id),
                        );
                        // register this car with its new assigned area
                        let register_handler = context.send_command(
                            None,
                            format!("/area/{new_area}"),
                            "registrations".to_string(),
                            Action::Register(car_id),
                        );

                        let handler = deregister_handler
                            .followed_by(register_handler)
                            .followed_by(context.set_value(CarAgent::AREA, new_area));
                        Some(handler)
                    } else {
                        // noop handler as the car didn't switch area
                        None
                    };

                    handler.discard()
                }))
            })
        };

        context
            .get_parameter("car_id")
            .map(|param: Option<String>| {
                let car_id = param.expect("Missing car_id URI parameter");
                u64::from_str(car_id.as_str()).expect("Failed to parse car ID into u64")
            })
            .and_then(area_handler)
            .followed_by(speed_handler)
    }
}

Area Agent

The AreaAgent will track all of the cars within a given area and calculate the average speed of all of them. Exposing a ValueLane<f64> with the average speed and pushing the average speed upwards to the CityAgent. This will require four lanes within the agent:

use crate::area::Area;
use swimos::{
    agent::lanes::{CommandLane, JoinValueLane, ValueLane},
    agent::{projections, AgentLaneModel},
};

#[projections]
#[derive(AgentLaneModel)]
pub struct AreaAgent {
    registrations: CommandLane<Action>,
    cars: JoinValueLane<u64, u64>,
    average_speed: ValueLane<f64>,
}

#[derive(Clone, Default)]
pub struct AreaLifecycle {
    area: Area,
}

impl AreaLifecycle {
    pub fn new(area: Area) -> AreaLifecycle {
        AreaLifecycle { area }
    }
}

Each AreaAgent is assigned its own area to manage when it is first instantiated; see the complete code for how this is handled here.

On Start

First, the agent’s on_start lifecycle event handler will be defined to send a command to the CityAgent to trigger it to listen to the cars lane for updates so it can include this AreaAgent’s data in its calculations.

use std::str::FromStr;
use swimos::{
    agent::agent_lifecycle::utility::HandlerContext,
    agent::event_handler::{EventHandler, HandlerActionExt},
    agent::lifecycle,
};

#[lifecycle(AreaAgent)]
impl AreaLifecycle {
    #[on_start]
    pub fn on_start(&self, context: HandlerContext<AreaAgent>) -> impl EventHandler<AreaAgent> {
        context.send_command(None, "/aggregate", "register", self.area.to_string())
    }
}

Car Registration and Deregistration

A single Command Lane will be used to handle registering and deregistering a car from the current agent. Both the registration and deregistration can be handled using a single Command Lane by combining the events into a single enumeration with two variants that both have a field corresponding to the car’s unique identifier; as a custom event type is being defined it implements the Form trait. This unique identifier will be used to either add or remove a downlink from the Join Value Lane:

use swimos::{
    agent::agent_lifecycle::utility::HandlerContext, agent::event_handler::EventHandler,
    agent::lifecycle,
};
use swimos_form::Form;

#[derive(PartialEq, Copy, Clone, Form)]
pub enum Action {
    Register(u64),
    Deregister(u64),
}

#[lifecycle(AreaAgent)]
impl AreaLifecycle {
    #[on_command(registrations)]
    pub fn registrations(
        &self,
        context: HandlerContext<AreaAgent>,
        action: &Action,
    ) -> impl EventHandler<AreaAgent> {
        match action {
            Action::Register(car_id) => context
                .add_downlink(
                    AreaAgent::CARS,
                    *car_id,
                    None,
                    format!("/cars/{car_id}").as_str(),
                    "speed",
                )
                .boxed(),
            Action::Deregister(car_id) => context.remove_downlink(AreaAgent::CARS, *car_id).boxed(),
        }
    }
}

This lifecycle event handler now creates a group of all of the cars that have been assigned to this area and is what enables aggregations to be performed.

Average Speed

Now that the AreaAgent has an event stream of speeds coming from all of the agents, the Join Value Lane’s lifecycle event handler can compute the average speed of all of the linked cars each time a downlink receives an update. A Join Value Lane’s lifecycle event handler is provided with a reference to the underlying map which contains the current speed of each car and this may be used to compute the average. Once this has been calculated, the average_speed lane is updated:

use std::collections::HashMap;
use swimos::{
    agent::agent_lifecycle::utility::HandlerContext, agent::event_handler::EventHandler,
    agent::event_handler::HandlerActionExt, agent::lifecycle,
};

#[lifecycle(AreaAgent)]
impl AreaLifecycle {
    #[on_update(cars)]
    fn cars(
        &self,
        context: HandlerContext<AreaAgent>,
        speeds: &HashMap<u64, u64>,
        _key: u64,
        _prev: Option<u64>,
        _new_value: &u64,
    ) -> impl EventHandler<AreaAgent> {
        let average = speeds.values().sum::<u64>() as f64 / speeds.len() as f64;
        context.set_value(AreaAgent::AVERAGE_SPEED, average)
    }
}

To recap on the event flow that leads to this handler being invoked, a CarAgent periodically changes its current area and registers with the corresponding AreaAgent to that it listens for events. The CarAgent also periodically sets its current speed and as the AreaAgent has downlinked to its speed lane, the Join Value Lane’s on_update lifecycle event handler is invoked and a new average speed across the area can be calculated.

City Agent

The CityAgent calculates the average speed across all of the AreaAgent. The structure of this agent is similar to the AreaAgent except there is no deregister lane as registration is not transient. Three lanes are required in the agent:

The agent CityAgent is defined as:

use swimos::{
    agent::lanes::{CommandLane, JoinValueLane},
    agent::{projections, AgentLaneModel},
};

#[projections]
#[derive(AgentLaneModel)]
pub struct CityAgent {
    aggregated: JoinValueLane<String, f64>,
    register: CommandLane<String>,
}

#[derive(Clone, Default)]
pub struct CityLifecycle;

Aggregation

Similar to how AreaAgent downlinks to the CarAgents, the CityAgent will downlink to the average speed lane in the AreaAgent. This will provide the agent with a continuous stream of events from the AreaAgents so that the agent can calculate the average speed of all the cars in the city.

use std::collections::HashMap;
use swimos::{
    agent::event_handler::HandlerActionExt,
    agent::{agent_lifecycle::utility::HandlerContext, event_handler::EventHandler, lifecycle},
};

#[lifecycle(CityAgent)]
impl CityLifecycle {
    #[on_update(aggregated)]
    fn aggregated(
        &self,
        context: HandlerContext<CityAgent>,
        averages: &HashMap<String, f64>,
        _key: String,
        _prev: Option<f64>,
        _new_value: &f64,
    ) -> impl EventHandler<CityAgent> {
        let average = averages.values().sum::<f64>() / averages.len() as f64;
        context.set_value(CityAgent::AVERAGE_SPEED, average)
    }
}

Area Registration

When an AreaAgent first starts, registers with the CityAgent so that the agent opens a downlink to receive updates. This is done using a Command Lane’s lifecycle event handler:

use std::collections::HashMap;
use swimos::{
    agent::event_handler::HandlerActionExt,
    agent::{agent_lifecycle::utility::HandlerContext, event_handler::EventHandler, lifecycle},
};

#[lifecycle(CityAgent)]
impl CityLifecycle {
    #[on_command(register)]
    pub fn register(
        &self,
        context: HandlerContext<CityAgent>,
        area_id: &String,
    ) -> impl EventHandler<CityAgent> {
        context.add_downlink(
            CityAgent::AGGREGATED,
            area_id.clone(),
            None,
            format!("/area/{}", area_id).as_str(),
            "average_speed",
        )
    }
}

Conclusion

The final design of this example application exposes three levels of resolution into the speed of cars within a city: at car, area and city levels. We recommend playing around with the complete code for this guide, available here to understand how this design works and to view the routes available for the agents. There are a number of additions that could be made to this design: