Service

A look about the service

Introduction

The service is the core of the lib. It is the place where the magic happens. The core relies on lapin to handle the AMQP protocol and tokio for async + spawn. Tokio spawns one delegate per vCPU to handle the request and the response. As an optimisation for heavy load, the lib uses an async spawn to publish the response so any free vCPU can handle the publish step.

Handler shape

A service handler is registered on an RpcService either via the #[girolle] macro or by handing the service an RpcTask. Internally each handler is an async closure of the shape:

Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<GirolleResult<Value>> + Send + Sync>

The handler receives the per-delivery RpcContext and a Payload carrying positional args and kwargs, and returns a future resolving to the JSON result. Errors propagate back to the caller as GirolleError and are serialized into a Nameko-compatible RemoteError.

RpcContext

RpcContext carries the inbound delivery's metadata and two capability handles:

fieldmeaning
service_namethe inbound routing key's service component
method_namethe inbound routing key's method component
correlation_idthe AMQP correlation id of the inbound delivery
reply_tothe requester's reply queue routing key
headersthe raw inbound headers (incl. nameko.call_id_stack)
rpcan RpcCaller for outbound RPC, stamped with the inbound headers
eventsan EventDispatcher for emitting events, stamped with the inbound headers

Both rpc and events propagate the inbound nameko.call_id_stack on outbound messages, so distributed traces stay consistent across hops.

In-service RPC

ctx.rpc.call(service, method, payload).await issues an outbound RPC and awaits the reply. Internally, the RpcService stands up a single shared reply queue, a DashMap<correlation_id, oneshot::Sender> correlation map, and a publish channel — all wired up automatically when the service starts.

Events

Service handlers can publish Nameko-compatible events with ctx.events.dispatch(source_service, event_type, &payload).await. The {source_service}.events topic exchange is declared lazily on first emit per source, and messages are published as durable JSON.

A service can also subscribe to events emitted by other services with RpcService::subscribe(source_service, event_type, handler). The handler follows the same (RpcContext, Value) -> Future shape as RPC handlers. Subscriptions and register(...) calls can coexist on the same service; either alone is also valid.