Async / Tokio
Background causal inference on a Tokio runtime
An event handler runs a causal model on a Tokio task while the main loop stays free.
The full crate lives at examples/tokio_example/. The whole thing is four files plus a Cargo.toml; the main function is twelve lines on purpose. The point is to show how DeepCausality’s synchronous inference plugs into an asynchronous runtime without ceremony.
File map
examples/tokio_example/
├── Cargo.toml
└── src/
├── main.rs # Tokio entry point + task spawn
├── handler.rs # EventHandler: holds the model, runs inference per event
├── model.rs # build_causal_model() + causaloid + context
├── types.rs # type alias for the parameterized Model
└── utils.rs # synthetic input data
Five files. None of them is longer than fifty lines. The architecture is small enough to read end to end in five minutes.
The Tokio entry point — src/main.rs
use crate::handler::EventHandler;
use crate::model::build_causal_model;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let event_handler = EventHandler::new(build_causal_model());
tokio::spawn(async move {
if let Err(e) = event_handler.run_background_inference().await {
eprintln!("inference error: {e}");
}
})
.await
.expect("Failed to spawn async background task");
Ok(())
}
main.rs:18 onward.
Three moves in order. First, build_causal_model() constructs the inference model: a Causaloid with a context, wrapped in DeepCausality’s Model type. Second, the EventHandler takes ownership of that model under an Arc<RwLock<…>> so it can be shared across tasks. Third, tokio::spawn runs the handler on a background task; the main task awaits its completion so the program exits cleanly when inference is done.
In a real service the spawned task is long-lived. The .await at the end of main becomes a graceful-shutdown handle, and the handler reads events off a channel rather than from a fixed test array.
The model — src/model.rs
pub fn build_causal_model() -> BaseModelTokio {
let id = 1;
let author = "Marvin Hansen <marvin.hansen@gmail.com>";
let assumptions = None;
let causaloid = Arc::new(get_test_causaloid());
let context = Some(Arc::new(RwLock::new(get_test_context())));
let description = "This is a test causal model for the Tokio async runtime";
Model::new(id, author, description, assumptions, causaloid, context)
}
model.rs:13. A Model ties three things together: a Causaloid (the rule), a Context (the environment), and metadata (id, author, description, assumptions). All of them are optional except the Causaloid. Wrapping the Context in Arc<RwLock<_>> is what makes it safe to share across Tokio tasks; multiple readers can hold the lock at once and a writer can take it when the environment needs updating.
The Causaloid itself is a small predicate over a single NumericalValue:
pub fn get_test_causaloid() -> BaseCausaloid<NumericalValue, bool> {
let id: IdentificationValue = 1;
let description = "tests whether data exceeds threshold of 0.75";
fn causal_fn(obs: NumericalValue) -> PropagatingEffect<bool> {
if obs.is_sign_negative() {
return PropagatingEffect::from_error(CausalityError(CausalityErrorEnum::Custom(
"Observation is negative".into(),
)));
}
let threshold: NumericalValue = 0.75;
let is_active = obs.ge(&threshold);
PropagatingEffect::pure(is_active)
}
Causaloid::new(id, causal_fn, description)
}
model.rs:24. Three properties of this function are worth pointing at.
It is a free function, not a closure. That matters because Causaloid::new takes a function pointer; you can swap one rule for another by name without rebuilding any state.
It returns PropagatingEffect<bool>, never a bare Result. The error path (negative observation) goes into the propagating effect’s error field via from_error. Downstream code sees the same shape whether the rule succeeded or failed, which is what keeps the chain composable.
The context construction lives in get_test_context just below. It builds a tiny BaseContext with a single Root Contextoid; the model carries it but does not yet consult it. A production rule would replace causal_fn with a ContextualCausalFn that reads from the context every call.
The handler — src/handler.rs
pub struct EventHandler {
model: Arc<RwLock<BaseModelTokio>>,
}
impl EventHandler {
pub fn new(model: BaseModelTokio) -> Self {
Self { model: Arc::new(RwLock::new(model)) }
}
}
impl EventHandler {
pub async fn run_background_inference(&self) -> Result<(), Box<dyn Error + Send>> {
let data = utils::get_test_data();
let causaloid = {
let model = self.model.read().unwrap();
Arc::clone(model.causaloid())
};
for d in data.into_iter() {
self.handle_inference(d, &causaloid)?
}
Ok(())
}
handler.rs:11 onward.
The handler owns the model under Arc<RwLock<…>>. The model’s Causaloid is itself an Arc, so the handler clones the inner Arc (cheap) before processing and immediately drops the outer RwLock read guard. That guarantees the lock is held only for the few microseconds it takes to clone the pointer, not for the whole inference loop.
The inference call is the actual work:
fn handle_inference(
&self,
data: f64,
bc: &BaseCausaloid<NumericalValue, bool>,
) -> Result<(), Box<dyn Error + Send>> {
let input_effect: PropagatingEffect<NumericalValue> = PropagatingEffect::pure(data);
let res = bc.evaluate(&input_effect);
if res.is_ok() {
let value = res.value.into_value().unwrap_or(false);
println!("EventHandler: Inference successful with res: {}", value)
} else {
println!("EventHandler: Inference failed with error: {}", res.error.unwrap())
}
Ok(())
}
handler.rs:45. One value in, one effect out. The error case branches on res.is_ok() and reaches into res.error for the failure reason; the success case pulls the boolean out of res.value. No ? propagation, no async hops, no awaits. The inference is synchronous; the asynchrony is at the runtime boundary, not inside it.
The data — src/utils.rs
const N: usize = 10;
pub fn get_test_data() -> [f64; N] {
[0.99; N]
}
utils.rs:6. Ten copies of 0.99. The threshold is 0.75. Every observation should fire, so the program prints ten consecutive “Inference successful” lines. In a real service the data source is a Kafka consumer, a database tailer, or an HTTP request body; the type signature does not change.
Run it
git clone https://github.com/deepcausality-rs/deep_causality
cd deep_causality
cargo run --release -p tokio_example
Expected output: a “Build Event Handler” line, a “Start the data handler” line, then ten “Inference successful with res: true” lines, then the program exits.
Where to take it next
Three modifications turn this into a working service skeleton.
Replace utils::get_test_data with a tokio::sync::mpsc::Receiver and rewrite run_background_inference as a while let Some(event) = rx.recv().await { … } loop. Now the handler processes one event per channel message and the main task can push messages from any source.
Make the rule contextual. Swap Causaloid::new(id, causal_fn, description) for Causaloid::new_contextual(id, contextual_causal_fn, description, context_arc). The closure receives the input and a reference to the BaseContext, so the threshold becomes a Contextoid the operator can update at runtime without restarting the service.
Persist the audit log. Every evaluate call returns a PropagatingEffect whose logs field carries an EffectLog. Push that log to wherever your audit pipeline lives (a database, a Kafka topic, a structured log sink); the trace of which Causaloid fired with what input is already there.
Why this is a good first example
The crate is small, complete, and exercises the integration most production services need on day one: a causal model running asynchronously off a runtime, behind a shared-state lock that does not become a bottleneck. Nothing about the inference shape changes when you scale the data source up. The same Causaloid::evaluate call works for one event or a million.