Stateful Services (private release) Build composable event-driven data pipelines in minutes.

Request access

How to Write to Apache Kafka from a Fluvio topic

Grant Swanson

Grant Swanson

VP Marketing, InfinyOn

SHARE ON
GitHub stars

This blog shows the power of Fluvio for performing real-time data transformations and provides a step by step example of how to stream clean data to a Kafka topic. In this example we are taking source data from the Finnhub API with our HTTP source connector, aggregating stock prices, and caluclating unrealised gains or losses in real-time before we send it to Apache Kafka.

Start

Install minikube, helm, kubectr with the following instructions: https://www.fluvio.io/docs/get-started/linux/#installing-kubernetes-cluster.

Install Fluvio.

Install Fluvio CLI:

$ curl -fsS https://hub.infinyon.cloud/install/install.sh | bash

This command will download the Fluvio Version Manager (fvm), Fluvio CLI (fluvio) and config files into $HOME/.fluvio, with the executables in $HOME/.fluvio/bin. To complete the installation, you will need to add the executables to your shell $PATH.

Start Fluvio Cluster:

$ fluvio cluster start

Verify the cluster is running:

$ fluvio topic create greetings
echo "Hello, Fluvio" | fluvio produce greetings
fluvio consume greetings -B -d

Part one

Fluvio topic to Kafka Sink

Recap of the Financal Services Demo

• git clone https://github.com/infinyon/fluvio-demo-04-12-2022-finance-demo.git

• register on finhub.io and obtain api token

• update API token in quote-data-input.yaml

Create a HTTP connector Check if the fluvio topic is populated:

$ fluvio consume gme-stocks -B

Start a local Apache Kafka dev

Clone https://github.com/infinyon/kafka_webinar_16_August_2022 and change the value ADV_HOST in docker-compose-webinar.yml, where ADV_HOST is pinned to minikube network gateway 192.168.49.1:

check minikube ip

$ minikube ip
192.168.49.2

and amend ADV_HOST in docker-compose-webinar.yml

$ docker compose -f docker-compose-webinar.yml up -d

Validate that Kafka is working

$ docker run --rm -it --net=host lensesio/fast-data-dev kafka-topics --zookeeper localhost:2181 --list

Write to Kafka from Fluvio topic

ADV_HOST and kafka_url in webinar-kafka-sink-connector.yml shall match to local IP (ifconfig| grep inet for linux)

fluvio connector create -c ./webinar-kafka-sink-connector.yml
fluvio connector logs -f my-kafka-sink1

Part two

Write clean data to Kafka from a Fluvio topic

Apply a Smart Module to a fluvio topic before writing to Kafka Smart module calculates unrealised gains or losses. Runs an aggregate function on an assumed “purchased” stocks (warrant).

fn update_profit(&mut self) {
        let mut profit = 0.0;
        for warrant in &self.warrants {
            profit += (self.current_price - (warrant.exercise_price + warrant.purchase_price))*warrant.count as f64;
        }

where warrents.txt

{"expiry_date": "Tue, 11 Apr 2022 13:50:37 +0000", "exercise_price": 140.0, "purchase_price": 12.0, "count": 10}
{"expiry_date": "Tue, 12 Apr 2022 13:50:37 +0000", "exercise_price": 110.0, "purchase_price": 10.0, "count": 11}
{"expiry_date": "Tue, 12 Apr 2022 17:50:37 +0000", "exercise_price": 150.0, "purchase_price": 11.0, "count": 12}
{"expiry_date": "Tue, 13 Apr 2022 13:50:37 +0000", "exercise_price": 160.0, "purchase_price": 13.0, "count": 13}

In the fluvio-demo-04-12-2022-finance-demo folder run

make sm-upload
make produce-warrants
make sm-consume

Those commands will compile and upload a smart module. Produce warrants will generate purchase orders so current profit can be calculated.

Start Kafka sink connector with SmartModule

fluvio connector create -c ./webinar-kafka-sink-connector-with-sm.yml

Rerun produce warrants: In fluvio-demo-04-12-2022-finance-demo run

%copy% Sink connector reads fluvio topic from the end, and we are re-running make produce-warrants to make sure fluvio topic is populated, which is then appearing in kafka-aggregate-fluvio.

Watch kafka topic via Web UI http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/kafka-aggregate-fluvio/ or via command line:

docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.1.89:9092"

Webinar recording: Enhance your kafka infrastructure with fluvio

See webinar with live demo.

Stay connected

Connect with us:

Please, be sure to join our Discord server if you want to talk to us or have any questions.

Subscribe to our YouTube channel

Follow us on Twitter

Follow us on LinkedIn

Have a happy coding, and stay tuned!

Additional Notes

Running Kafka commands:

docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.49.1:9092"

Further reading