Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0.
And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json
format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name,
serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema,
kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer)

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道:

> I’ve scoured the web looking for an example of using a Kafka source for a
> DataStream in python. Can someone finish this example?
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
>
> --
> Robert Cullen
> 240-475-4490
>

Reply via email to