| Best, Jimmy | Signature is customized by Netease Mail Master --------- 转发的邮件 --------- 发件人: Jimmy Zhang 发送日期: 2021年03月29日 14:05 收件人: Qishang 抄送人: 主题: 回复:在FlinkKafkaProducer获取sink表的建表key Hi,Qiishang,非常感谢你的回复,我看了你说的代码,应该是可以解决我的需求,不过我还没有细看,因为本身这个需求可能涉及Dynamic这块不多。 另外,我已经通过这种方法成功解决该问题。 1.我发现FlinkKafkaProducer是在KafkaTableSink.createKafkaProducer中进行构造的 2.KafkaTableSink继承于KafkaTableSinkBase,而在后者中,有TableSchema类作为成员变量,而TableSchema有getFieldNames方法获取到sink表字段名字,KafkaTableSinkBase封装了这个方法,名字一样,返回值是一个String[],这正是我需要的。 3.我在KafkaTableSink.createKafkaProducer中利用super.getFieldNames获取到String[],并新创建一个FlinkKafkaProducer的构造函数,将参数传入,达到我的目的。 | Best, Jimmy | Signature is customized by Netease Mail Master 在2021年03月29日 11:26,Qishang 写道: Hi Jimmy. FlinkKafkaProducer 里面是没有的,可以试着从 KafkaDynamicSink 里面传到 FlinkKafkaProducer 中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType 这个里面可以拿到 Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道: > Hi!大家好。 > 目前有一个需求,需要获取Kafka > sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢! > 例如:CREATE TABLE kafkaTable ( > > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'csv', > 'scan.startup.mode' = 'earliest-offset' > ) > 想获取到 user_id, item_id ,category_id ,behavior这四个字段。 > > > | | > Jimmy Zhang > | > | > 13669299...@163.com > | > 签名由网易邮箱大师定制