Join Map Lanes

This page covers the specifics of Join Map Lanes and does not cover the more general aspects of Lanes. For more general information about lanes, see the Lane page. If you are not familar with Map Downlinks, it is recommended to first read the Map Downlinks guide.

Overview

A Join Map Lane provides a view into multiple Map Lanes by using Map Downlinks and reduces the state of the remote maps into a single map where an entry is owned by the link that most recently updated it. This allows you to aggregate the state of multiple data sources into a single data structure and be notified when an entry changes. It is not possible to directly mutate the state of the map and instead the value associated with a key is mutated by the underlying downlink receiving an event.

A Join Map Lane meets the following requirements:

For instances where a scalar structure is required, a Join Value Lane exists where the value associated with an entry is a scalar type.

Example: using a Join Map Lane to aggregate the state of multiple Map Lanes:


#[projections]
#[derive(AgentLaneModel)]
pub struct GeneratorAgent {
    state: MapLane<u32, u32>,
}

#[derive(Clone)]
pub struct GeneratorLifecycle;

#[lifecycle(GeneratorAgent)]
impl GeneratorLifecycle {
    #[on_start]
    pub fn on_start(
        &self,
        context: HandlerContext<GeneratorAgent>,
    ) -> impl EventHandler<GeneratorAgent> {
        // When this agent starts, schedule a task to update the state of
        // the lane so there is some data to work with in our ConsumerAgent.
        context.schedule_repeatedly(Duration::from_millis(500), move || {
            let mut rnd = rand::thread_rng();
            Some(context.update(GeneratorAgent::STATE, rnd.next_u32(), rnd.next_u32()))
        })
    }
}

#[projections]
#[derive(AgentLaneModel)]
pub struct ConsumerAgent {
    lanes: JoinMapLane<String, u32, u32>,
}

#[derive(Clone)]
pub struct ConsumerLifecycle;

#[lifecycle(ConsumerAgent)]
impl ConsumerLifecycle {
    #[on_start]
    pub fn on_start(
        &self,
        context: HandlerContext<ConsumerAgent>,
    ) -> impl EventHandler<ConsumerAgent> {
        // Issue commands to three different 'state' lanes so the
        // agents start.
        let commands = context
            .send_command(None, "/node/a", "state", 0)
            .followed_by(context.send_command(None, "/node/b", "state", 0))
            .followed_by(context.send_command(None, "/node/c", "state", 0));

        // Next, create the downlinks to the 'state' lanes and
        // associate each downlink with a key. This key will be
        // provided to any lifecycle event handlers so you can
        // uniquely identify the source of an event.
        let downlinks = context
            .add_map_downlink(
                // Projection to the JoinMapLane
                ConsumerAgent::LANES,
                // Unique key for this downlink
                "a".to_string(),
                // `None` as the target lane is local
                None,
                // Node that we're downlinking to
                "/node/a",
                // Lane that we're downlinking to
                "state",
            )
            .followed_by(context.add_map_downlink(
                ConsumerAgent::LANES,
                "b".to_string(),
                None,
                "/node/b",
                "state",
            ))
            .followed_by(context.add_map_downlink(
                ConsumerAgent::LANES,
                "c".to_string(),
                None,
                "/node/c",
                "state",
            ));
        // Chain the actions together into a single event handler
        // that will issue the commands and then open the downlinks.
        commands.followed_by(downlinks)
    }
}

Use cases

Join Map Lanes are suitable for situations where you want to aggregate the state of multiple lanes into a single map and are generally used to produce a wider view of a dataset which is spread across a number of agents. Common usecases for Join Map Lanes are:

Event handlers

Join Map Lanes use the same lifecycle event handlers as Map Lanes and provide an additional handler which can be registered to create a handler for the downlinks of the lane.

On Update

The on_update event handler has the following signature for a Join Map lane type of JoinMapLane<i32, i32, i32>:

#[on_event(lane_name)]
fn handler(
    &self,
    context: HandlerContext<ExampleAgent>,
    map: &HashMap<i32, i32>,
    downlink_key: i32,
    prev: Option<i32>,
    new_value: &i32,
) -> impl EventHandler<ExampleAgent> {
    //...
}

The handler is provided with a reference to the current state of the underlying map, the downlink key that was updated, the previous value associated with the key (if one existed) and a reference to the updated value.

Only one may be registered for the lane and it is invoked exactly once after an entry has been updated.

On Remove

The on_remove event handler has the following signature for a Join Map lane type of JoinMapLane<i32, i32, i32>:

#[on_remove(lane_name)]
fn on_remove(
    &self,
    context: HandlerContext<ExampleAgent>,
    map: &HashMap<i32, i32>,
    downlink_key: i32,
    prev: i32,
) -> impl EventHandler<ExampleAgent> {
    //...
}

The handler is provided with a reference to the current state of the underlying map, the downlink key that was updated and the value that was removed.

Only one may be registered for the lane and it is invoked exactly once after an entry has been removed.

Join Map Lifecycle

The join_map_lifecycle event handler has the following signature for a Join Map lane type of JoinMapLane<i32, i32, i32>:

#[join_map_lifecycle(lane_name)]
fn register_count_lifecycle(
    &self,
    context: JoinMapContext<ExampleAgent, i32, i32>,
) -> impl JoinMapLaneLifecycle<i32, i32, i32, ExampleAgent> + 'static {
    //...
}

The handler is invoked to create a new lifecycle event handler for each downlink used in the lane. The returned lifecycle may specify lifecycle event handlers for:

A complete example for a Join Map downlink lifecycle:

use swimos::{
    agent::agent_lifecycle::utility::JoinMapContext,
    agent::lanes::join_map::lifecycle::JoinMapLaneLifecycle,
    agent::lanes::LinkClosedResponse
};

#[join_map_lifecycle(lane_name)]
fn lifecycle(
    &self,
    context: JoinMapContext<ExampleAgent, i32, i32, i32>,
) -> impl JoinMapLaneLifecycle<i32, i32, ExampleAgent> + 'static {
    context
        .builder()
        .on_linked(move |context, key, remote| {
            let remote_fmt = remote.to_string();
            context.effect(move || {
                println!("{key} linked to {remote_fmt}");
            })
        })
        .on_synced(move |context, key, _remote, value| {
            let value = value.cloned();
            context.effect(move || {
                println!("{key} synced. State: {key:?} -> {value:?}");
            })
        })
        .on_unlinked(move |context, key, remote| {
            let remote_fmt = remote.to_string();
            context.effect(move || {
                println!("{key} unlinked to {remote_fmt}");
                // `Abandon` will remove the underlying link and maintain the data in the
                // underlying map.
                LinkClosedResponse::Abandon
            })
        })
        .on_failed(move |context, key, remote| {
            let remote_fmt = remote.to_string();
            context.effect(move || {
                println!("{key} failed to {remote_fmt}");
                // `Delete` will remove the underlying link and remove the data from the
                // underlying map.
                LinkClosedResponse::Delete
            })
        })
        .done()
}

Subscription

A subscription to a Join Map Lane can only be achieved via a Map Downlink. An example client for the first agent example:

use swimos_client::{BasicMapDownlinkLifecycle, RemotePath, SwimClientBuilder};

#[tokio::main]
async fn main() {
    let (client, task) = SwimClientBuilder::default().build().await;
    let _client_task = tokio::spawn(task);
    let handle = client.handle();

    let lifecycle = BasicMapDownlinkLifecycle::<String, u32>::default()
        .on_update_blocking(|key, _map, _previous, new| {
            println!("Entry updated: {key:?} -> {new:?}")
        })
        .on_removed_blocking(|key, _map, value| println!("Entry removed: {key:?} -> {value:?}"))
        .on_clear_blocking(|map| println!("Map cleared: {map:?}"));

    let _handle = handle
        .map_downlink::<String, u32>(RemotePath::new("ws://0.0.0.0:59673", "/consumer", "lanes"))
        .lifecycle(lifecycle)
        .open()
        .await
        .expect("Failed top open downlink");

    tokio::signal::ctrl_c()
        .await
        .expect("Failed to listen for ctrl-c.");
}

Further reading: Downlinks

Try It Yourself

A standalone project that demonstrates Join Map Lanes is available here.