This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new bef8eae8036 [hotfix][python] Fix Kafka csv example bef8eae8036 is described below commit bef8eae8036e685464ce9af461b2f58f7cc012f1 Author: Dian Fu <dia...@apache.org> AuthorDate: Wed Nov 1 17:26:49 2023 +0800 [hotfix][python] Fix Kafka csv example --- .../pyflink/examples/datastream/connectors/kafka_csv_format.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py index 4dbb243fcf9..39c134a8ed3 100644 --- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py +++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py @@ -21,8 +21,7 @@ import sys from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer -from pyflink.datastream.formats.csv import CsvRowSerializationSchema -from pyflink.datastream.formats.json import JsonRowDeserializationSchema +from pyflink.datastream.formats.csv import CsvRowSerializationSchema, CsvRowDeserializationSchema # Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is @@ -46,9 +45,8 @@ def write_to_kafka(env): def read_from_kafka(env): - deserialization_schema = JsonRowDeserializationSchema.Builder() \ - .type_info(Types.ROW([Types.INT(), Types.STRING()])) \ - .build() + type_info = Types.ROW([Types.INT(), Types.STRING()]) + deserialization_schema = CsvRowDeserializationSchema.Builder(type_info).build() kafka_consumer = FlinkKafkaConsumer( topics='test_csv_topic',