??????????????????????????????????????????????






------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<appleyu...@163.com&gt;;
????????:&nbsp;2021??1??30??(??????) ????3:08
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re:??????????Pyflink??



??????kafka??????????????????????.

????????????????????,??????Flink.













?? 2021-01-30 14:25:57??"??????" <389243...@qq.com&gt; ??????

????????????????????Flink????kafka??????csv??????????????????????Flink??Kafka????kerberos??????????????????????????pyflink??????????????????????????????????????????csv??????????????????????????????????????????????????????kafka
 ??????????????????????????????????
1??Kafka??????????

2??pyflink ????


#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, 
CsvTableSink, WriteMode, SqlDialect
from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
 
"192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 
'kafka').property("kerberos.domain.name", 
'hadoop.hadoop.com').property("bootstrap.servers", 
"192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
 DataTypes.BIGINT()),DataTypes.FIELD("name", 
DataTypes.STRING())]))).with_schema(Schema().field("id", 
DataTypes.BIGINT()).field("name", 
DataTypes.STRING())).register_table_source("sourceKafka")
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, 
WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
resultQuery = st_env.sql_query("select id,name from sourceKafka")
resultQuery = resultQuery.insert_into("csvTableSink")
st_env.execute("pyflink-kafka-v2")
3??pyflink-shell.sh local

4)????????
??pyflink-shell local??????????????????????????????????????????????????????

??????????????????????????????????????????????????????????????????????????pyflink????????????????????????????????????????????????????kerberos??????kafka????????????????????????????????????????????????????????????????????flink????kafka-hdfs??????????????????????????????????????????????????????????

回复