图上看不出有什么异常,可以看一下 task manager 日志 Best regards
左岩 <13520871...@163.com> 于2022年10月31日周一 11:34写道: > > > 还是没有消费到,麻烦查看附件中的图片 > > > > > > 在 2022-10-31 10:03:05,"guozhi mang" <rookiegao...@gmail.com> 写道: > >我想你的格式错了 > >下面我修改了一下 > >tenv.executeSql( > >" create table t_upsert_kafka( " > >+ " userid int , " > >+ " username string, " > >+ " age int, " > >+ " `partition` int , " > >+ " PRIMARY KEY (userid) NOT ENFORCED " > >+ " ) with ( " > >+ " 'connector' = 'upsert-kafka', " > >+ " 'topic' = 'test02', " > >+ " 'properties.bootstrap.servers' = '192.168.0.82:9092', " > >+ " 'key.format' = 'json', " > >+ " 'value.format' = 'json' " > >+ " ) " > >); > > > >*下面是官方案例* > > > >CREATE TABLE pageviews_per_region ( > > user_region STRING, > > pv BIGINT, > > uv BIGINT, > > PRIMARY KEY (user_region) NOT ENFORCED) WITH ( > > 'connector' = 'upsert-kafka', > > 'topic' = 'pageviews_per_region', > > 'properties.bootstrap.servers' = '...', > > 'key.format' = 'avro', > > 'value.format' = 'avro'); > > > > > >左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道: > > > >> > >> > >> > >> > >> public static void main(String[] args) throws Exception { > >> Configuration conf = new Configuration(); > >> conf.setInteger("rest.port", 10041); > >> StreamExecutionEnvironment env = > >> StreamExecutionEnvironment.getExecutionEnvironment(conf); > >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > >> // env.setParallelism(1); > >> > >> env.enableCheckpointing(3000); > >> env.setStateBackend(new HashMapStateBackend()); > >> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt"); > >> > >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000); > >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > >> > >> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > >> // 创建目标 kafka映射表 > >> tenv.executeSql( > >> " create table t_upsert_kafka( " > >> + " userid int primary key not enforced, " > >> + " username string, " > >> + " age int, " > >> + " `partition` int " > >> + " ) with ( " > >> + " 'connector' = 'upsert-kafka', " > >> + " 'topic' = 'test02', " > >> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " > >> + " 'key.format' = 'json', " > >> + " 'value.format' = 'json' " > >> + " ) " > >> ); > >> > >> tenv.executeSql("select * from t_upsert_kafka").print(); > >> > >> tenv.executeSql( > >> " CREATE TABLE t_kafka_connector ( " > >> + " userid int , " > >> + " username string, " > >> + " age int, " > >> + " `partition` int " > >> + " ) WITH ( " > >> + " 'connector' = 'kafka', " > >> + " 'topic' = 'test02', " > >> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " > >> + " 'properties.group.id' = 'testGroup1', " > >> + " 'scan.startup.mode' = 'earliest-offset', " > >> + " 'format'='json' " > >> + " ) " > >> > >> ); > >> > >> tenv.executeSql("select * from t_kafka_connector").print(); > >> > >> env.execute(); > >> > >> > >> > >> > >> > >> t_upsert_kafka 消费不到 t_kafka_connector可以消费到 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道: > >> >hi, > >> > > >> >看不到的图片。能不能直接展示文字或者用图床工具? > >> > > >> >Best, > >> >Shengkai > >> > > >> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道: > >> > > >> >> upsert kafka作为source时,消费不到kafka中的数据 > >> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka > >> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下 > >> >> > >> > >