Register for our latest webinar, Enhance your Kafka Infrastructure with Fluvio.

How to Write to Apache Kafka from a Fluvio topic

by Grant Swanson (@ruststreaming)
| September 19, 2022
| Reading time: 3 minutes

This blog shows the power of Fluvio for performing real-time data transformations and provides a step by step example of how to stream clean data to a Kafka topic. In this example we are taking source data from the Finnhub API with our HTTP source connector, aggregating stock prices, and caluclating unrealised gains or losses in real-time before we send it to Apache Kafka.

 

Start

Install minikube, helm, kubectr with the following instructions: https://www.fluvio.io/docs/get-started/linux/#installing-kubernetes-cluster.

 

Install Fluvio.

$ curl -fsS https://packages.fluvio.io/v1/install.sh | bash

Start Fluvio Cluster

$ fluvio cluster start

Verify the cluster is running:

$ fluvio topic create greetings
echo "Hello, Fluvio" | fluvio produce greetings
fluvio consume greetings -B -d
 

Part one

Fluvio topic to Kafka Sink

 

Recap of the Financal Services Demo

• git clone https://github.com/infinyon/fluvio-demo-04-12-2022-finance-demo.git

• register on finhub.io and obtain api token

• update API token in quote-data-input.yaml

Create a HTTP connector Check if the fluvio topic is populated:

$ fluvio consume gme-stocks -B
 

Start a local Apache Kafka dev

Clone https://github.com/infinyon/kafka_webinar_16_August_2022 and change the value ADV_HOST in docker-compose-webinar.yml, where ADV_HOST is pinned to minikube network gateway 192.168.49.1:

check minikube ip

$ minikube ip
192.168.49.2

and amend ADV_HOST in docker-compose-webinar.yml

$ docker compose -f docker-compose-webinar.yml up -d

Validate that Kafka is working

$ docker run --rm -it --net=host lensesio/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
 

Write to Kafka from Fluvio topic

ADV_HOST and kafka_url in webinar-kafka-sink-connector.yml shall match to local IP (ifconfig| grep inet for linux)

fluvio connector create -c ./webinar-kafka-sink-connector.yml
fluvio connector logs -f my-kafka-sink1
 

Part two

Write clean data to Kafka from a Fluvio topic

Apply a Smart Module to a fluvio topic before writing to Kafka Smart module calculates unrealised gains or losses. Runs an aggregate function on an assumed “purchased” stocks (warrant).

fn update_profit(&mut self) {
        let mut profit = 0.0;
        for warrant in &self.warrants {
            profit += (self.current_price - (warrant.exercise_price + warrant.purchase_price))*warrant.count as f64;
        }

where warrents.txt

{"expiry_date": "Tue, 11 Apr 2022 13:50:37 +0000", "exercise_price": 140.0, "purchase_price": 12.0, "count": 10}
{"expiry_date": "Tue, 12 Apr 2022 13:50:37 +0000", "exercise_price": 110.0, "purchase_price": 10.0, "count": 11}
{"expiry_date": "Tue, 12 Apr 2022 17:50:37 +0000", "exercise_price": 150.0, "purchase_price": 11.0, "count": 12}
{"expiry_date": "Tue, 13 Apr 2022 13:50:37 +0000", "exercise_price": 160.0, "purchase_price": 13.0, "count": 13}

In the fluvio-demo-04-12-2022-finance-demo folder run

make sm-upload
make produce-warrants
make sm-consume

Those commands will compile and upload a smart module. Produce warrants will generate purchase orders so current profit can be calculated.

 

Start Kafka sink connector with SmartModule

fluvio connector create -c ./webinar-kafka-sink-connector-with-sm.yml

Rerun produce warrants: In fluvio-demo-04-12-2022-finance-demo run

%copy% Sink connector reads fluvio topic from the end, and we are re-running make produce-warrants to make sure fluvio topic is populated, which is then appearing in kafka-aggregate-fluvio.

Watch kafka topic via Web UI http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/kafka-aggregate-fluvio/ or via command line:

docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.1.89:9092"
 

Webinar recording: Enhance your kafka infrastructure with fluvio

See webinar with live demo.

Stay connected

 

Connect with us:

Please, be sure to join our Discord server if you want to talk to us or have any questions.

Subscribe to our YouTube channel

Follow us on Twitter

Follow us on LinkedIn

Have a happy coding, and stay tuned!

 

Additional Notes

Running Kafka commands:

docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.49.1:9092"
 

Further reading