End to end overview
This document will use Valkey as an example for explaining the end to end flow of messages through shotover. We will walk through a single request from the client and Valkey's response. The same flow within Shotover is used for all protocols, so this document should still be useful if you are working with another protocol.
The general flow of messages though Shotover looks like:

Shotover Config
In the scenario this document works through, Shotover will be run with the following topology.yaml configuration:
sources:
- Valkey:
name: "valkey"
listen_addr: "127.0.0.1:6379"
chain:
# A made up transform for our example.
- SomeTransform
# Another made up transform for our example
- AnotherTransform
# The sink transform that routes Valkey requests to the correct node in the cluster
- ValkeySinkCluster:
first_contact_points:
- "172.16.1.2:6379"
- "172.16.1.3:6379"
- "172.16.1.4:6379"
connect_timeout_ms: 3000
The client sends request
A user sends a Valkey command through their valkey client:
- The user runs a set command by calling:
client.set("foo", "bar"). - The client translates the
set(..)arguments into a RESP request that looks like:["SET", "foo", "bar"] - A hash is taken of the key
foowhich is used to choose which Shotover node to send the request to. - The RESP request is converted into the RESP wire format, which is purely ascii except for user data:
*3
$3
SET
$3
foo
$3
bar
*3 means an array with 3 elements.
The first element is $3\nSET, which means a string of length 3 containing SET.
The second and third arguments are also strings of length 3: $3\nfoo and $3\nbar
- The bytes of the message are sent over a TCP connection to the chosen Shotover node. In this example, no such connection exists so a new one is made.
Shotover accepts a new connection
When ValkeySource is created during shotover startup, it creates a TcpCodecListener and then calls TcpCodecListener::run which listens in a background task for incoming TCP connections on the sources configured port.
TcpCodecListener accepts a new connection from the Valkey client and constructs and runs a Handler type, which manages the connection.
The Handler type creates:
- read/write tasks around the TCP connection.
- A
ValkeyEncoderandValkeyDecoderpair is created from ValkeyCodecBuilder. - The
ValkeyEncoderis given to the write task - The
ValkeyDecoderis given to the read task
- A
- a new transform chain instance to handle the requests coming in from this connection.
- This transform chain instance handles a single connection passing from the client to Valkey and isolates it from other connections.
The handler type then continues to run, routing requests and responses between the transform chain and the client connection read/write tasks.
Finally, at this point our callstack will look something like this. Each section of this document will include such a diagram, showing a high level representation of the call stack during the section.
block-beta
block:main["Call Stack\n\n\n\n\n"]
columns 1
space
ValkeySource
IncomingConnectionTask
end
Source ValkeyDecoder
The tokio_util crate provides an Encoder trait and a Decoder trait.
Through this interface:
- we provide the logic for how to encode and decode messages into and out of a buffer of bytes by implementing the traits.
- tokio provides the logic for reading and writing the bytes from the actual TCP connection via the FramedWrite and FramedRead types.
Since TCP itself provides a stream of bytes without any application level framing 1 it is up to the database protocol itself to implement framing on top of TCP.
So the logic of a Decoder implementation must gracefully handle incomplete messages. Leaving any half received messages in the buffer.
Protocols like kafka and cassandra achieve framing by including a message length in bytes in the header. This is great for Shotover since it means we can avoid parsing the entire message when its not needed. However Valkey does not have a header so we always need to parse the entire message to find out where it ends.
The ValkeyDecoder is an example of a Decoder implementation.
Lets step through the ValkeyDecoder implementation:
Framing is how a protocol defines where individual messages begin and end.
Reading a message
The first thing ValkeyDecoder::decode does is attempt to parse a Valkey message from the beginning of the bytes.
This is done by calling decode_bytes_mut from the redis-protocol crate.
There are a few possible return values:
- Failure to parse because the message is not fully received yet - in this case we return
Noneso that theFramedReadwill call us again when more bytes have been received. - Any other kind of parse error - we bubble up the error, eventually resulting in the connection being terminated.
- a message is successfully returned - we continue on and pass the message to the next stage.
- In this case the parsed message forms a structure of:
#![allow(unused)] fn main() { ValkeyFrame::Array(vec![ ValkeyFrame::BulkString("SET"), ValkeyFrame::BulkString("foo"), ValkeyFrame::BulkString("bar"), ]) }
Constructing a Message
All messages in Shotover are stored in a Message type which is passed through each transform.
Message abstracts over all the different protocols supported by Shotover.
The ValkeyDecoder constructs a message by calling Message::from_bytes_and_frame_at_instant.
We pass in the raw bytes of the message and the parsed frame of the message, as well as a timestamp which is used purely for metrics.
Protocols with better framing mechanisms will use a different constructor to avoid parsing the whole request unless its really needed.
When the Message is created a new ID is generated and stored in the Message. This ID is a randomly generated 128bit integer used by transforms to match responses with their corresponding requests. This value is meaningful only within Shotover and is not part of the Valkey protocol.
Lets say in this example our message is assigned the ID 0xd12ac2704d19e53ef3fea94b4885c950.
ValkeyDecoder::decode then returns the Message to the caller.
The caller in this case being the tokio_util helper FramedRead.
Since the ValkeyDecoder runs on the read task the call stack is independent from the main call stack.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n"]
columns 1
space
ValkeyDecoder
FramedReader
IncomingReadTask
end
Codec to transform glue
The Message then goes through a few steps before it actually reaches a transform.
- The read task created by the Handler
- The message is read from the
FramedRead - The message is sent through a tokio channel
- This logic is specifically run in a separate task to enable decoding of incoming requests to run in parallel of any messages currently being process by transforms (calling tokio async code will execute on the same core unless a task is used)
- The message is read from the
- The Handler::run_loop method loops for the lifetime of the incoming connection and:
- Listens for requests from the read task over the channel
- If there are any requests, all pending requests are collected into a batch (
Vec<Message>) In our case the client is sending requests serially, waiting for responses each time. So this batch will contain only a single request. - Creates a ChainState.
ChainStatecontains all the chain level state accessed by transforms. This includes things like the batch of requests, the IP address and port the client connected to, a flag to allow transforms to force close the connection. Transforms are free to alter theChainStateand the next transform in the chain will receive the same alteredChainState. - Calls TransformChain::process_request passing it the
ChainState.
TransformChain::process_request:- Inserts the list of transforms in the chain into
ChainState - Calls
ChainState::call_next_transform
- Inserts the list of transforms in the chain into
- ChainState::call_next_transform:
- Pops the first transform from the list of transforms.
- Calls the transforms
transformmethod, beginning execution of the transform.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n"]
columns 1
space
TransformChain
ValkeySource
IncomingConnectionTask
end
Some Transform
The first transform in the chain begins executing. Most transforms look something like this:
#![allow(unused)] fn main() { async fn transform<'shorter, 'longer: 'shorter>( &mut self, chain_state: &'shorter mut ChainState<'longer>, ) -> Result<Messages> { // iterate over all requests for request in &mut chain_state.requests { // each request is of type Message if let Some(Frame::Valkey(frame)) = request.frame() { // Calling `frame` on the request returns the parsed frame of the message. // This assertion is silly, but would pass for the example request we are working through assert_eq!( frame, ValkeyFrame::Array(vec![ ValkeyFrame::BulkString(Bytes::from_static("SET")), ValkeyFrame::BulkString(Bytes::from_static("foo")), ValkeyFrame::BulkString(Bytes::from_static("bar")), ]) ); // At this point the transform is able to read and or rewrite the request as it pleases. // But for this example we will assume that no rewriting occurs. } } let mut responses = chain_state.call_next_transform().await?; for response in responses.iter_mut() { // do something with the responses } Ok(()) } }
At the point where call_next_transform is called, the next transform in the chain is popped from the list in ChainState and executed.
Execution of this transform asynchronously waits until the request is completely sent.
In the case of ValkeySinkCluster (the sink transform used in this example) call_next_transform will also block until a response for each request has been received. But that is legacy behavior that the other transforms do not have. So we will pretend that is not the case for the rest of this document.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n\n\n"]
columns 1
space
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
Another Transform
Another transform is called.
This is the same as the previous section.
However this time it pops the final transform from the list of transforms and executes it, in this scenario the final transform is ValkeySinkCluster.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n\n\n\n\n"]
columns 1
space
AnotherTransform["Another Transform"]
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
ValkeySinkCluster sends request
The ValkeySinkCluster transform is quite complex so I will only describe it at a high level.
- For each request in
ChainState- Determine how to route the request via RoutingInfo::for_command_frame, in this case, since we are routing a
setwith key offoowe getRoutingInfo::Slot(hash_of(foo)). - Lookup the computed slot value against the list of Valkey nodes to find which Valkey node should handle this slot.
- Send the request to the Valkey node. A new outgoing connection is created if it does not exist yet.
- Determine how to route the request via RoutingInfo::for_command_frame, in this case, since we are routing a
Other functionality of ValkeySinkCluster not listed above includes:
- fetching and managing the metadata required for routing requests.
- working in either cluster hiding or cluster handling mode, the topology.yaml configuration defined earlier used hiding mode, so thats how our request is handled.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n\n\n\n\n\n"]
columns 1
space
ValkeySinkCluster["ValkeySinkCluster Transform"]
AnotherTransform["Another Transform"]
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
SinkConnection send
The standard way to form an outgoing connection is with SinkConnection
However, out of the 6 sink transforms that Shotover has currently, ValkeySinkCluster is the only sink transform not to use SinkConnection.
This is only for legacy reasons, so to give a better overview of Shotover, I'll be pretending that ValkeySinkCluster does actually use SinkConnection.
The SinkConnection type contains a single TCP connection and exposes an interface allowing the creator to send and receive Shotover Messages over the TCP connection.
When the SinkConnection is created it runs spawn_read_write_tasks which creates the tokio tasks for reading and writing to the outgoing connection.
In our scenario the transform called SinkConnection::send which sends a batch of requests to the writer task over a channel. The batch of requests contains just the one SET request.
The writer task then writes the message to FramedWrite which encodes the message to the TCP connection via ValkeyEncoder.
block-beta
block:main["Call Stack (transform side)\n\n\n\n\n\n\n\n\n\n\n\n\n\n"]
columns 1
space
SinkConnectionSend["SinkConnection::send"]
ValkeySinkCluster["ValkeySinkCluster Transform"]
AnotherTransform["Another Transform"]
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
block-beta
block:main["Call Stack (write task side)\n\n\n\n\n\n\n"]
columns 1
space
ValkeyEncoder
FramedWriter
SinkConnectionWriteTask
end
Sink ValkeyEncoder
Earlier we talked about the Encoder and Decoder traits.
The ValkeyEncoder is an example of an Encoder implementation.
The logic for ValkeyEncoder::encode looks like:
- The into_encodable method is called on each request. This method returns the most efficient way to encode the request.
- Encodable::Frame - If the message is marked as modified by the transforms, the parsed valkey frame is returned, the encoder must reencode the bytes from the frame.
- Encodable::Bytes - If the message is not marked as modified, the raw bytes are returned and the encoder can simply write the raw bytes to the socket which is much faster.
In our example the request is unmodified so we take the fast path by directly writing the bytes.
You will recall that the original message was:
*3
$3
SET
$3
foo
$3
bar
That message is what is written out over TCP.
Additionally the sink ValkeyEncoder sends some metadata to its sink ValkeyDecoder counterpart.
Encoders and Decoders are always created in pairs allowing them to be assigned a shared channel at creation. This allows them to share state which is a requirement for working around stateful protocols.
The metadata sent to the ValkeyEncoder is a RequestInfo which tells the decoder:
- The request ID of the next response it will receive.
- In our case the ID was originally set to
0xd12ac2704d19e53ef3fea94b4885c950
- In our case the ID was originally set to
- The request type of the next response it will receive - Used to decide if
ValkeyDecodershould enter pubsub mode.- In our case it is a simple
SETrequest, so we are not entering pubsub mode.
- In our case it is a simple
block-beta
block:main["Call Stack\n\n\n\n\n\n\n"]
columns 1
space
ValkeyEncoder
FramedWriter
SinkConnectionWriteTask
end
Transform chain unwinds
Since the request has been sent on the TCP socket, the ValkeySinkCluster transform has no more work to do, so it returns.
In turn the other 2 transforms also return as they have completed.
Finally we get back to the ValkeySource which waits for either:
- A new request to come in from the client.
- A new response to come in from Valkey.
Asynchronously waiting for one of multiple events is achieved via a tokio select macro.
block-beta
block:main["Call Stack\n\n\n\n\n"]
columns 1
space
ValkeySource
IncomingConnectionTask
end
Valkey Instance
The request is received by Valkey.
Valkey modifies its internal value of foo to contain bar and then sends back a success response.
The success response is encoded in RESP as:
+OK
Sink ValkeyDecoder
Now the ValkeyDecoder is used again, but this time its for decoding a response instead of a request. Parsing a response has a few extra complexities that we don't have to deal with when parsing a request.
As with the source ValkeyDecoder, the sink ValkeyDecoder follows the same steps:
- Receives the raw bytes of the response
+OK - Parses this into the frame
ValkeyFrame::SimpleString("OK") - Stores the raw bytes and frame of the response in a Message.
Then things deviate a bit.
- The
RequestInfofrom the sinkValkeyEncoderis received. - The request ID field of the response
Messageis set to0xd12ac2704d19e53ef3fea94b4885c950the ID stored in theRequestInfo
block-beta
block:main["Call Stack\n\n\n\n\n\n\n"]
columns 1
space
ValkeyDecoder
FramedReader
SinkConnectionReadTask
end
SinkConnection receive
The ValkeyDecoder is driven by the SinkConnection read task.
The task waits for a response message to be successfully parsed from the TCP socket, once that occurs it will:
- Send the response to ValkeySinkCluster over the channel.
- Trigger the
force_run_chainNotifyto informValkeySourcethat it should run the transform chain as there is a pending response.Notifyis similar to a channel but carries no data, it is used only to await some kind of external event.- If we didn't notify the source about the pending response, the response would be stuck until the next request comes in, so
force_run_chainallows us to trigger the transform chain early to prevent stuck responses.
block-beta
block:main["Call Stack\n\n\n"]
columns 1
space
SinkConnectionReadTask
end
Transform chain begins again
ValkeySource is notified of force_run_chain.
It calls the transform chain which calls the first transform, which calls the second transform, which calls ValkeySinkCluster.
The transform chain was called with 0 requests, which is what happens when a force_run_chain occurs when there are no pending requests.
In this case the transforms just iterate over the 0 requests in ChainState, resulting in nothing occurring.
Then the transform calls the next transform in the chain.
Until finally ValkeySinkCluster transform is called.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n\n\n\n\n\n"]
columns 1
space
ValkeySinkCluster["ValkeySinkCluster Transform"]
AnotherTransform["Another Transform"]
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
ValkeySinkCluster receives responses
ChainState contains no requests, so none are sent.
However ValkeySinkCluster does find that one of the outgoing SinkConnections has a pending response.
So that connection is queried for responses and ValkeySinkCluster includes the response in its return value.
block-beta
block:main["Call Stack\n\n\n\n\n\n\n\n\n\n\n\n\n\n"]
columns 1
space
SinkConnectionRecv["SinkConnection::recv"]
ValkeySinkCluster["ValkeySinkCluster Transform"]
AnotherTransform["Another Transform"]
SomeTransform["Some Transform"]
TransformChain
ValkeySource
IncomingConnectionTask
end
The transform chain unwinds again
The transform chain unwinds again. However this time there is a response Message being returned by the called transform. Each transform now has an opportunity to inspect or rewrite the response.
Heres an example transform:
#![allow(unused)] fn main() { async fn transform<'shorter, 'longer: 'shorter>( &mut self, chain_state: &'shorter mut ChainState<'longer>, ) -> Result<Messages> { for request in &mut chain_state.requests { // do something with the requests } let mut responses = chain_state.call_next_transform().await?; for response in responses.iter_mut() { if let Some(Frame::Valkey(frame)) = response.frame() { // Calling `frame` on the response returns the parsed frame of the message. // This assertion is silly, but would pass for the example response we are working through assert_eq!( frame, ValkeyFrame::SimpleString(Bytes::from_static("OK")) ); // Calling `request_id` on the response returns the ID that was assigned to the request that this response is responding to. // Transforms can correlate requests to responses by calling `.id()` on requests and `.request_id()` on responses and comparing the results. // // This assertion is also silly, the id would be different every time. // But in this scenario that we are working through the request did in fact have this ID. assert_eq!(response.request_id(), Some(0xd12ac2704d19e53ef3fea94b4885c950)); } } Ok(()) } }
Finally we get back to the ValkeySource which sends the response off to the source write task.
block-beta
block:main["Call Stack\n\n\n\n\n"]
columns 1
space
ValkeySource
IncomingConnectionTask
end
Source ValkeyEncoder
Same as the sink write task, the source write task drives FramedWrite and ValkeyEncoder.
The source ValkeyEncoder is the same implementation as the sink ValkeyEncoder:
- into_encodable is called and then the returned value is encoded.
- However the source
ValkeyDecoderdoes not need to worry about the request ID's or pubsub state, so there is no metadata sent to the sourceValkeyDecoder.
Since the response is not modified at all, the raw bytes are written to the TCP socket:
+OK
block-beta
block:main["Call Stack\n\n\n\n\n\n\n"]
columns 1
space
ValkeyDecoder
FramedReader
IncomingWriteTask
end
The client receives response
The client receives the raw bytes +OK from the socket, parses it into "OK" and returns control back to the user since the request has succeeded.