Transform streaming data in real-time with WebAssembly

by Nick Mosher (@nicholastmosher)
| August 11, 2021
| Reading time: 4 minutes

Fluvio is a high-performance, distributed, programmable streaming platform for real-time data. We’ve been hard at work building new capabilities for inline data processing, a family of features that we call SmartStreams, and with our latest major release we announced the arrival of our new SmartStream Map functionality. This feature allows users to write custom code to inspect and transform each record or data in a stream. Users write SmartStream modules in Rust and compile them to WebAssembly, and they are ultimately executed in the Fluvio cluster on a Streaming Processing Unit (SPU).


Thinking about Mapping capabilities and use-cases

When we’re thinking about Mapping, we’re thinking about a function that takes one record as input and gives another record as output. With SmartStream Map, we are writing functions that run inside a WebAssembly sandbox, so these functions do not have access to the outside world, and cannot do certain things such as make network requests or write data to disk. In essence, reading and manipulating the input record is the only thing that a Map function can do.

This helps to set the stage for what types of operations we can perform using SmartStream Maps. I tend to think of these operations in the following broad categories (though there are certainly more):

  • Scrubbing sensitive fields of data to hide from downstream consumers
  • Narrowing large records into a smaller subset of important fields
  • Computing rich, derived fields from simple raw data
  • Parsing unstructured (e.g. textual) data into a structured form (e.g. JSON)

Let’s narrow in and explore how we can use SmartStreams to solve a “scrubbing sensitive fields” use-case.


Concrete use-case: Scrubbing Social Security Numbers from account records

Let’s imagine that we’re working with a banking account system, and we have a stream of data that represents account activity. Events in this stream might represent new accounts being created, passwords being changed, or personal info being updated. Suppose we want to write an application which sends a welcome email to new account owners after they sign up. We can structure our email application to consume from the accounts topic and send an email each time an account-created event appears.

So far so good, but let’s add a twist. Since we’re talking about bank accounts, the events contain some private information, such as the account holder’s Social Security Number. We would like to edit the records in our accounts stream and scrub out SSNs so that the email application never even has access to that data. That way, there’s no chance that a bug or a compromise in the email application could lead to disclosing this private information.

Alright, so let’s get concrete. We’ll set up our event schema so that records in our stream look something like this:

  "social_security_number": "123-45-6789",
  "event_type": "account-created",
  "account_id": "1509aaf8-5863-4b41-bfe2-b081691d7a6e",
  "first_name": "Daniel",
  "last_name": "Mahoney",
  "email": "",
  "password_hash": "db6b535bc9909ecfb7c2ee4550ed7b350a61785e"

After we’re done scrubbing, we want our records to look more like this:

  "social_security_number": "***-**-****",
  "event_type": "account-created",
  "account_id": "1509aaf8-5863-4b41-bfe2-b081691d7a6e",
  "first_name": "Daniel",
  "last_name": "Mahoney",
  "email": "",
  "password_hash": "db6b535bc9909ecfb7c2ee4550ed7b350a61785e"

Let’s take a look at a SmartStream Map implementation that could solve our use-case:

use regex::Regex;
use once_cell::sync::Lazy;
use fluvio_smartstream::{smartstream, Result, Record, RecordData};

// A compiled Regex for detecting SSNs that look like XXX-XX-XXXX
static SSN_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap());

pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
    let key = record.key.clone();

    let string = std::str::from_utf8(record.value.as_ref())?;
    let output = SSN_RE.replace_all(string, "***-**-****").to_string();

    Ok((key, output.into()))

That’s it. That’s the whole SmartStream.

Of course, this is an over-simplified solution to a toy problem, but it illustrates how we can quickly manipulate the data in our stream with relatively little effort. In this SmartStream, we’re using a RegEx to detect any string that has digits in the typical SSN layout, and replace them with a meaningless substitute string ***-**-****. Another potential solution to this problem would be to parse the JSON and delete the social_security_number field, but for our purposes this strategy was simpler.

For completeness on this first example, here is an example session showing some data being processed by this SmartStream:

  1. First, create a new topic for us to produce and consume our data
$ fluvio topic create accounts
topic "accounts" created
  1. Show the contents of a file with sample input data
$ cat account.json
  1. Produce the sample input data to our topic
$ fluvio produce accounts < account.json
  1. Consume from our topic after applying our SmartStream Map
$ fluvio consume accounts -B --map=src/smartstream/examples/target/wasm32-unknown-unknown/release/fluvio_wasm_map_regex.wasm
Consuming records from the beginning of topic 'accounts'

If we were trying to deploy this to production we would obviously want a much more intelligent function for detecting sensitive data in order to scrub it out, but it’s clear to see that even with this simplified example, the consumer never witnesses any of the SSN information.

That’s an important point, so I want to restate it:

The consumer never witnesses any of the SSN information

That’s because SmartStream code is executed in the Fluvio cluster, before it even touches the network on the way to the consumer. Having this ability to perform server-side data processing opens up a whole new world of interesting possible applications, and we’re excited to see what new use-cases the community discovers.



That’s it for this post, be sure to join the discussion on Reddit or hop into our Discord server if you want to talk to us or have any questions. Until next time!


Further reading