Hi Robert, Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.
The following code shows how to apply KafkaConsumers and KafkaProducer: ``` env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # define the schema of the message from kafka, here the data is in json format. type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'], [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(), Types.INT()]) json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build() # define the kafka connection properties. kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'} # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties. kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props) kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props) # set the kafka source to consume data from earliest offset. kafka_consumer.set_start_from_earliest() # create a DataStream from kafka consumer source ds = env.add_source(kafka_consumer) result_stream = ... # write the result into kafka by a kafka producer sink. result_stream.add_sink(kafka_producer) ``` Best, Shuiqiang Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道: > I’ve scoured the web looking for an example of using a Kafka source for a > DataStream in python. Can someone finish this example? > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > ds = env.from_collection( KAFKA_SOURCE ) > ... > > -- > Robert Cullen > 240-475-4490 >