Python Examples
- The Python client wraps the rust client.
- It currently does not support the administrator features that the rust client does.
- The PartitionConsumer.stream returns an object which implements the python iterator convention to allow for iterating over the stream in a for-loop.
To see the full docs, visit our pdoc page.
Follow the installation instructions to run this example.
Create the topic used to produce and consume records:
fluvio topic create python-data
Login to Infinyon Cloud using username/password
from fluvio import cloud
cloud.login(email="my@email.com", password="mypassword")
You can also use the oauth method to log in. However this is only for interactive sessions.
from fluvio import cloud
cloud.login(Oauth2=true)
Create a file called python-produce.py:
#!/usr/bin/env python
from datetime import datetime
from fluvio import Fluvio
TOPIC_NAME = "python-data"
PARTITION = 0
if __name__ == "__main__":
   # Connect to cluster
   fluvio = Fluvio.connect()
   # Produce 10 records to topic
   producer = fluvio.topic_producer(TOPIC_NAME)
   for x in range(10):
       producer.send_string("{}: timestamp: {}".format(x, datetime.now()))
   # Flush the last entry
   producer.flush()
Let’s run the file:
$ python python-produce.py
Create a file called python-consume.py:
#!/usr/bin/env python
from fluvio import Fluvio, Offset
TOPIC_NAME = "python-data"
PARTITION = 0
if __name__ == "__main__":
   # Connect to cluster
   fluvio = Fluvio.connect()
   # Consume last 10 records from topic
   consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
   for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
       print("{}".format(record.value_string()))
       
       if idx >= 9:
           break
Let’s run the file:
$ python python-consume.py
- Fluvio cluster administration is not supported.
- Python async is not supported.
#!/usr/bin/env python
import os
from datetime import datetime
from fluvio import Fluvio, Offset, ConsumerCoonfig
TOPIC_NAME = "hello-python-smartmodule"
PARTITION = 0
# This is an example of a basic Fluvio workflow in Python
#
# 1. Create a topic to store data in via CLI
# 2. Establish a connection to the Fluvio cluster
# 3. Create a producer and send some bytes
# 4. Create a consumer, and stream the data back
if __name__ == "__main__":
   # Currently the Python client does not support creating topics
   # Using the fluvio CLI
   os.popen("fluvio topic create {}".format(TOPIC_NAME))
   # Connect to cluster
   fluvio = Fluvio.connect()
   # Produce to topic
   producer = fluvio.topic_producer(TOPIC_NAME)
   producer.send_string("Hello World! - Time is: {}".format(datetime.now()))
   # Consume from topic
   # We're just going to get the last record
   consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
   # Create a ConsumerConfig using your "uppercase-map" smartmodule
   config = ConsumerConfig()
   config.smartmodule(name="uppercase-map")
   for record in consumer.stream_with_config(Offset.from_end(0), config):
       print("{}".format(record.value_string()))
       break