Stop Babysitting Infrastructure Ship Event Driven Data Flows Confidently
Build and operate programmable data flows to continuously transform, enrich, and materialize data from edge, sensors, logs.
apiVersion: 0.1.0
meta:
version: 0.2.5
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: shift
spec:
payload:
VP:
veh: "vehicle"
spd: "speed"
- uses: infinyon/[email protected]
with:
mapping:
table: "speed"
map-columns:
"lat":
"vehicle":
json-key: "vehicle"
value:
type: "int"
required: true
"speed":
json-key: "speed"
value:
type: "float"
required: true
version: 0.1.0
meta:
name: dedup-bus-events
deduplication:
bounds:
count: 1000000
age: 24h
filter:
transform:
uses: infinyon-labs/[email protected]
apiVersion: 0.1.0
meta:
name: avg-vehicle-last-5-minutes
materialize:
window:
type: tumbling
bound: "5min"
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
apiVersion: 0.1.0
meta:
version: 0.1.0
name: md-helsinki
type: duckdb-sink
topic: helsinki
secrets:
- name: MD_TOKEN
duckdb:
url: "md:?token=${{ secrets.MD_TOKEN }}"
apiVersion: 0.1.0
meta:
version: 0.2.5
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: shift
spec:
payload:
VP:
veh: "vehicle"
spd: "speed"
- uses: infinyon/[email protected]
with:
mapping:
table: "speed"
map-columns:
"lat":
"vehicle":
json-key: "vehicle"
value:
type: "int"
required: true
"speed":
json-key: "speed"
value:
type: "float"
required: true
version: 0.1.0
meta:
name: dedup-bus-events
deduplication:
bounds:
count: 1000000
age: 24h
filter:
transform:
uses: infinyon-labs/[email protected]
apiVersion: 0.1.0
meta:
name: avg-vehicle-last-5-minutes
materialize:
window:
type: tumbling
bound: "5min"
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
apiVersion: 0.1.0
meta:
version: 0.1.0
name: md-helsinki
type: duckdb-sink
topic: helsinki
secrets:
- name: MD_TOKEN
duckdb:
url: "md:?token=${{ secrets.MD_TOKEN }}"
Increase returns on data investments
-
Reduce
Time and ComplexityTired of babysitting a bunch of tools and infrastructure to build basic data pipelines? We are as well!
-
Improve
Development ExperienceWork on analytics, insights, intelligence instead of endlessly debugging failed jobs and lost data.
-
Deliver
Continuous Data FlowsDeliver truly event driven continuous data enrichment to data consumers and intelligence to your customers.
Technology Leaders Experience
Stuck with a bloated data stack?
Realize the benefits of a unified data platform!
What data practitioners say
-
Artem
Data Platform Architect
"I would love a unified solution without the memory limitations of JVM based tools."
-
Joao
Cloud Infrastructure Architect
"It was awesome to see the CLI experience. That's like a million times better than Kafka. I think it's uncomfortable how much better it is than the Kafka."
-
Jowanza
Data Practitioner, CEO, Author
"The event based approach without babysitting a bunch of point solutions is the way I want to build."
Building blocks of reliable data pipelines
Collect your data with built-in connectors, webhooks, and IoT collectors, or build your own custom clients.
apiVersion: 0.1.0
meta:
version: 0.2.5
name: http-github
type: http-source
topic: github-events
secrets:
- name: GITHUB_TOKEN
http:
endpoint: 'https://api.github.com/repos/infinyon/fluvio'
method: GET
headers:
- 'Authorization: token ${{ secrets.GITHUB_TOKEN }}'
interval: 30s
apiVersion: 0.1.0
meta:
version: 0.2.5
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
apiVersion: 0.1.0
meta:
name: my-stripe-webhook
topic: payment_confirmations
secrets:
- name: stripe_secret_key
webhook:
verification:
- uses: infinyon/[email protected]
with:
webhook_key: ${{ stripe_secret_key }}
## Cloud Collector
$ fluvio topic create collector-topic --mirror
$ fluvio topic collector-topic --mirror-add 6001
$ fluvio cloud register cluster --id 6001 --source-ip 35:42:12:192
$ fluvio cloud cluster metadata export --topic local-topic \
--mirror-spu 6001 --file remote-6001.toml
## IoT Sensor (ARMv7, ARM64, etc.)
$ fluvio cluster start --local --config remote-6001.toml
## Produce at Edge
$ fluvio produce local-topic
> test from 6001
## Read from Cloud
$ fluvio consume collector-topic -B
test from 6001
use fluvio::{Fluvio, RecordKey};
#[async_std::main]
async fn main() {
let topic = "rust-topic";
let record = "Hello from rust!";
let fluvio = Fluvio::connect().await.unwrap();
let producer = fluvio::producer(topic).await.unwrap();
producer.send(RecordKey::NULL, record).await.unwrap();
producer.flush().await.unwrap();
}
#!/usr/bin/env python
from fluvio import Fluvio
if __name__ == "__main__":
topic = "python-topic";
record = "Hello from rust!";
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(topic)
producer.send_string(record)
producer.flush()
import Fluvio from "@fluvio/client";
const produce = async () => {
const TOPIC_NAME = "node-topic";
const RECORD_TEXT = "Hello from node!"
await fluvio.connect();
const producer = await fluvio.topicProducer(TOPIC_NAME);
await producer.send(RECORD_TEXT);
};
const fluvio = new Fluvio();
produce();
Coming soon ...
Apply transformations in-line as your data traverses the pipelines. Use pre-built SmartModules from the Hub or build your own with SMDK.
# Filter out all records that fail to match `Cat` or `cat`
transforms:
- uses: infinyon/[email protected]
with:
regex: "[Cc]at"
# JOLT: JSON Language for Transformations
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: remove
spec:
id: ""
- operation: shift
spec:
"*": "data.&0"
- operation: default
spec:
data:
source: "http-connector"
# Tranform JSON records into SQL statements (used with sql-sink)
transforms:
- uses: infinyon/[email protected]
with:
mapping:
table: "target_table"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
# Look at last 10 records.
# Filter out based based on your custom business logic.
transforms:
- uses: infinyon/[email protected]
lookback:
last: 10
## Use smartmodule developer kit (smdk) to build your own.
$ fluvio install smdk
## Create your smarmodule project
$ smdk generate
🤷 Project Name: my-smartmodule
🤷 Which type of SmartModule would you like?
❯ filter
map
filter-map
array-map
aggregate
## Build & Test
$ smdk build
$ smdk test
## Load to Cluster
$ smdk load
## Publish to Hub
$ smkd publish
Apply duplication for exactly-once semantics. Choose our certified pre-built algorithms or build your own.
# Deduplication configuration file
# - topic-config.yaml
name: dedup
deduplication:
bounds:
count: 1000000
age: 1w
filter:
transform:
uses: infinyon-labs/[email protected]
# apply dedup configuration
$ fluvio topic apply --config topic-config.yaml
# produce
$ fluvio produce dedup --key-separator ":"
1: one
1: one
2: two
# consume
$ fluvio consume dedup -B -k
1: one
2: two
Materialized views are in early development and the interfaces presented here are subject to change. Join us on Discord to give us feedback.
# Materialized view configuration file
# - mv.yaml
apiVersion: 0.1.0
meta:
name: avg_bus_speed
materialize:
window:
type: tumbling
bound: "5min"
trigger: events
input:
topic: helsinki
schema:
- name: vehicle_id
type: u16
- name: speed
type: float
output:
fields:
- name: vehicle_id
- name: avg_speed
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
# create materialized view
$ fluvio materialized-view create --config mv.yaml
# view materialized view current state
$ fluvio state avg_bus_speed
{
[
{
"vehicle_id": "123",
"avg_speed": 10.0
},
{
"vehicle_id": "456",
"avg_speed": 35.0
}
]
}
Send data to multiple destinations with build-in certified connectors, your own custom connectors, or your full featured consumer clients.
apiVersion: 0.1.0
meta:
version: 0.2.5
name: my-http-sink
type: http-sink
topic: http-sink-topic
secrets:
- name: AUTHORIZATION_TOKEN
http:
endpoint: "http://127.0.0.1/post"
headers:
- "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
- "Content-Type: application/json"
apiVersion: 0.1.0
meta:
version: 0.3.3
name: json-sql-connector
type: sql-sink
topic: sql-topic
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
transforms:
- uses: infinyon/json-sql
with:
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
apiVersion: 0.1.0
meta:
version: 0.1.2
name: weather-monitor-sandiego
type: graphite-sink
topic: weather-ca-sandiego
graphite:
# https://graphite.readthedocs.io/en/latest/feeding-carbon.html#step-1-plan-a-naming-hierarchy
metric-path: "weather.temperature.ca.sandiego"
addr: "localhost:2003"
apiVersion: 0.1.0
meta:
version: 0.1.0
name: duckdb-connector
type: duckdb-sink
topic: fluvio-topic-source
duckdb:
url: 'local.db' # local duckdb
use async_std::stream::StreamExt;
use fluvio::{Fluvio, Offset};
#[async_std::main]
async fn main() {
let topic = "rust-topic";
let partition = 0;
let fluvio = Fluvio::connect().await.unwrap();
let consumer = fluvio::consumer(topic, partitino).await.unwrap();
let mut stream = consumer.stream(Offset::from_end(1)).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
#!/usr/bin/env python
from fluvio import Fluvio, Offset
if __name__ == "__main__":
topic = "python-topic";
partition = 0;
fluvio = Fluvio.connect()
consumer = fluvio.partition_consumer(topic, partition)
for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
print("{}".format(record.value_string()))
if idx >= 9:
break
import Fluvio, { Offset, Record } from "@fluvio/client";
const consume = async () => {
const TOPIC_NAME = "hello-node";
const PARTITION = 0;
await fluvio.connect();
const consumer = await fluvio.partitionConsumer(TOPIC_NAME, PARTITION);
await consumer.stream(Offset.FromEnd(), async (record: Record) => {
console.log(`Key=${record.keyString()}, Value=${record.valueString()}`);
process.exit(0);
});
};
const fluvio = new Fluvio();
consume();
Coming soon ...
## Use connector developer kit (cdk) to build your own.
$ fluvio install cdk
## Create your connector project
$ cdk generate
🤷 Project Name: my-connector
🤷 Which type of Connector would you like [source/sink]? ›
source
❯ sink
## Build & Test
$ cdk build
$ cdk test
## Deploy on your machine
$ cdk deploy
## Publish to Hub
$ cdk publish