图上看不出有什么异常,可以看一下 task manager 日志

Best regards


左岩 <13520871...@163.com> 于2022年10月31日周一 11:34写道:

>
>
> 还是没有消费到,麻烦查看附件中的图片
>
>
>
>
>
> 在 2022-10-31 10:03:05,"guozhi mang" <rookiegao...@gmail.com> 写道:
> >我想你的格式错了
> >下面我修改了一下
> >tenv.executeSql(
> >" create table t_upsert_kafka(                 "
> >+ "    userid int ,        "
> >+ "    username string,                          "
> >+ "    age int,                             "
> >+ "    `partition` int ,                            "
> >+ "  PRIMARY KEY (userid) NOT ENFORCED "
> >+ " ) with (                                    "
> >+ "  'connector' = 'upsert-kafka',              "
> >+ "  'topic' = 'test02',                "
> >+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> >+ "  'key.format' = 'json',                             "
> >+ "  'value.format' = 'json'                            "
> >+ " )                                                  "
> >);
> >
> >*下面是官方案例*
> >
> >CREATE TABLE pageviews_per_region (
> >  user_region STRING,
> >  pv BIGINT,
> >  uv BIGINT,
> >  PRIMARY KEY (user_region) NOT ENFORCED) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'pageviews_per_region',
> >  'properties.bootstrap.servers' = '...',
> >  'key.format' = 'avro',
> >  'value.format' = 'avro');
> >
> >
> >左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:
> >
> >>
> >>
> >>
> >>
> >> public static void main(String[] args) throws Exception {
> >> Configuration conf = new Configuration();
> >> conf.setInteger("rest.port", 10041);
> >> StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >> //        env.setParallelism(1);
> >>
> >> env.enableCheckpointing(3000);
> >> env.setStateBackend(new HashMapStateBackend());
> >> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
> >>
> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> >>
> >> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >> // 创建目标 kafka映射表
> >> tenv.executeSql(
> >> " create table t_upsert_kafka(                 "
> >> + "    userid int primary key not enforced,        "
> >> + "    username string,                          "
> >> + "    age int,                             "
> >> + "    `partition` int                             "
> >> + " ) with (                                    "
> >> + "  'connector' = 'upsert-kafka',              "
> >> + "  'topic' = 'test02',                "
> >> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> >> + "  'key.format' = 'json',                             "
> >> + "  'value.format' = 'json'                            "
> >> + " )                                                  "
> >> );
> >>
> >> tenv.executeSql("select * from t_upsert_kafka").print();
> >>
> >> tenv.executeSql(
> >> " CREATE TABLE t_kafka_connector (                       "
> >> + "    userid int ,        "
> >> + "    username string,                          "
> >> + "    age int,                             "
> >> + "    `partition` int                             "
> >> + " ) WITH (                                               "
> >> + "  'connector' = 'kafka',                                "
> >> + "  'topic' = 'test02',                             "
> >> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',      "
> >> + "  'properties.group.id' = 'testGroup1',                  "
> >> + "  'scan.startup.mode' = 'earliest-offset',           "
> >> + "  'format'='json'                               "
> >> + " )                                                   "
> >>
> >> );
> >>
> >> tenv.executeSql("select * from t_kafka_connector").print();
> >>
> >> env.execute();
> >>
> >>
> >>
> >>
> >>
> >> t_upsert_kafka 消费不到   t_kafka_connector可以消费到
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
> >> >hi,
> >> >
> >> >看不到的图片。能不能直接展示文字或者用图床工具?
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
> >> >
> >> >> upsert kafka作为source时,消费不到kafka中的数据
> >> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
> >> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
> >> >>
> >>
>
>

回复