没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);

On 2021/09/10 07:41:10, "xia_...@163.com" <xia_...@163.com> wrote: 
> Hi:
>     有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> 
> DataStream<String> kafkaSource = env.addSource(source);
> Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>();
> for (RowToColumnBean bean : lists) {
>     OutputTag<TmpTable> app = new OutputTag<TmpTable>(bean.getMainTable()) {
>     };
>     sideOutStreamMap.put(bean.getMainTable(), app);
> }
> 
> RowToNumberProcessFunction rowToNumberProcessFunction = new 
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> SingleOutputStreamOperator<TmpTable> process = 
> kafkaSource.process(rowToNumberProcessFunction);
> 
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .build();
> 
> StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
> settings, new TableConfig());
> //设置checkpoint
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
>  "10 s");
> 
> for (RowToColumnBean bean : lists) {
>     DataStream<TmpTable> dataStream = 
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> 
>     String mainTable = bean.getMainTable().split(" 
> ")[0].split("\\.")[1].toLowerCase();
> 
>     //Table tmpTable = tableEnv.fromDataStream(dataStream, 
> StrUtil.list2Str(bean.getQueryColumns()));
> 
>     tableEnv.createTemporaryView(mainTable, dataStream);
> 
>     String joinTable = mainTable + "_join";
>     tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
>             "rowkey STRING,\n" +
>             "info ROW<formid STRING>,\n" +
>             "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
>             ") WITH (\n" +
>             "'connector' = 'hbase-2.2',\n" +
>             "'table-name' = 
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
>             "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
>             "'zookeeper.znode.parent' = '/hbase'\n" +
>             ")");
> 
> 
>     //查询数据
>     //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
> dformfiled b on a.key = b.rowkey");
>     Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
> left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
> where b.rowkey is not null");
> 
>     TableSchema schema = table.getSchema();
>     schema.getTableColumns().forEach(column -> {
> 
>         System.err.println(column.asSummaryString());
>     });
> 
>     DataStream<Tuple2<Boolean, Row>> tuple2DataStream = 
> tableEnv.toRetractStream(table, Row.class);
>     tuple2DataStream.print(mainTable);
>     dataStream.print(mainTable);
> }
> 
> 
> xia_...@163.com
> 

回复