This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new bef8eae8036 [hotfix][python] Fix Kafka csv example
bef8eae8036 is described below

commit bef8eae8036e685464ce9af461b2f58f7cc012f1
Author: Dian Fu <dia...@apache.org>
AuthorDate: Wed Nov 1 17:26:49 2023 +0800

    [hotfix][python] Fix Kafka csv example
---
 .../pyflink/examples/datastream/connectors/kafka_csv_format.py    | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 4dbb243fcf9..39c134a8ed3 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -21,8 +21,7 @@ import sys
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, 
FlinkKafkaConsumer
-from pyflink.datastream.formats.csv import CsvRowSerializationSchema
-from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+from pyflink.datastream.formats.csv import CsvRowSerializationSchema, 
CsvRowDeserializationSchema
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
@@ -46,9 +45,8 @@ def write_to_kafka(env):
 
 
 def read_from_kafka(env):
-    deserialization_schema = JsonRowDeserializationSchema.Builder() \
-        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
-        .build()
+    type_info = Types.ROW([Types.INT(), Types.STRING()])
+    deserialization_schema = 
CsvRowDeserializationSchema.Builder(type_info).build()
 
     kafka_consumer = FlinkKafkaConsumer(
         topics='test_csv_topic',

Reply via email to