Skip to main content

Messaging

The messaging capability allows your application to subscribe to subjects to receive messages, publish payloads to subject, or request a response from a subject. A subject, simply, is a string of characters that two services can be used to find each other, and if you're interested in learning more we'd recommend the NATS Subject-based messaging documentation. This capability enables chatbots, low-latency communication between microservices, direct messaging applications, and more. This capability is included as one of the open source wasmCloud interfaces and has the capability contract ID wasmcloud:messaging.

Using this Capability

This capability can be used in your project by adding a dependency on the wasmcloud-interface-messaging crate, and by adding wasmcloud:messaging to the capabilities section in your application's wasmcloud.toml file.

Cargo.toml

toml
wasmcloud-interface-messaging = "0.8.1"
toml
wasmcloud-interface-messaging = "0.8.1"

wasmcloud.toml

toml
[actor]
capabilities = ["wasmcloud:messaging"]
toml
[actor]
capabilities = ["wasmcloud:messaging"]

Basic Operations

OperationInputOutput
publishPubMessageResult that indicates success or failure
requestRequestMessageReplyMessage

Unlike other contracts, subscribing to subjects is done using links. You can read more about this in our NATS configuration section.

Example Usage

Handle an Incoming Message from a Subscription

rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_logging::info;
use wasmcloud_interface_messaging::{MessageSubscriber, MessageSubscriberReceiver, SubMessage};
#[derive(Debug, Default, Actor, HealthResponder)]
#[services(Actor, MessageSubscriber)]
struct LogMessagingActor {}
#[async_trait]
impl MessageSubscriber for LogMessagingActor {
/// Handle a message received on a subscription
async fn handle_message(&self, _ctx: &Context, msg: &SubMessage) -> RpcResult<()> {
info!("Received message: {:?}", msg);
Ok(())
}
}
rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_logging::info;
use wasmcloud_interface_messaging::{MessageSubscriber, MessageSubscriberReceiver, SubMessage};
#[derive(Debug, Default, Actor, HealthResponder)]
#[services(Actor, MessageSubscriber)]
struct LogMessagingActor {}
#[async_trait]
impl MessageSubscriber for LogMessagingActor {
/// Handle a message received on a subscription
async fn handle_message(&self, _ctx: &Context, msg: &SubMessage) -> RpcResult<()> {
info!("Received message: {:?}", msg);
Ok(())
}
}

Publish a Message

rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_messaging::{Messaging, MessagingSender, PubMessage};
async fn publish_message(ctx: &Context, subject: &str, body: &[u8]) -> RpcResult<()> {
if let Err(e) = MessagingSender::new()
.publish(
ctx,
&PubMessage {
body: body.to_vec(),
reply_to: None,
subject: subject.to_owned(),
},
)
.await
{
Err(format!("Could not publish message {}", e.to_string()).into())
} else {
Ok(())
}
}
rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_messaging::{Messaging, MessagingSender, PubMessage};
async fn publish_message(ctx: &Context, subject: &str, body: &[u8]) -> RpcResult<()> {
if let Err(e) = MessagingSender::new()
.publish(
ctx,
&PubMessage {
body: body.to_vec(),
reply_to: None,
subject: subject.to_owned(),
},
)
.await
{
Err(format!("Could not publish message {}", e.to_string()).into())
} else {
Ok(())
}
}

Publish a Message and Request a Reply

rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_messaging::{Messaging, MessagingSender, RequestMessage};
async fn message_request(ctx: &Context, subject: &str, body: &[u8]) -> RpcResult<()> {
if let Err(e) = MessagingSender::new()
.request(
ctx,
&RequestMessage {
body: body.to_vec(),
subject: subject.to_owned(),
timeout_ms: 1_000,
},
)
.await
{
Err(format!("Could not request message {}", e.to_string()).into())
} else {
Ok(())
}
}
rust
use wasmbus_rpc::actor::prelude::*;
use wasmcloud_interface_messaging::{Messaging, MessagingSender, RequestMessage};
async fn message_request(ctx: &Context, subject: &str, body: &[u8]) -> RpcResult<()> {
if let Err(e) = MessagingSender::new()
.request(
ctx,
&RequestMessage {
body: body.to_vec(),
subject: subject.to_owned(),
timeout_ms: 1_000,
},
)
.await
{
Err(format!("Could not request message {}", e.to_string()).into())
} else {
Ok(())
}
}

Choosing an Implementation

After you've written your component to use the messaging capability, you will choose an implementation, or capability provider, at runtime to connect to a real messaging implementation.

Available Implementations

Below you can find the different implementations of messaging that you can use out-of-the-box. If the options don't meet your needs, you can always implement the wasmcloud:messaging interface with your own implementation, please reach out to us on Discord so we can collaborate with you on a solution.