没有开启checkpoint execEnv.enableCheckpointing(checkpointInterval);
On 2021/09/10 07:41:10, "xia_...@163.com" <xia_...@163.com> wrote: > Hi: > 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume > 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下 > > DataStream<String> kafkaSource = env.addSource(source); > Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>(); > for (RowToColumnBean bean : lists) { > OutputTag<TmpTable> app = new OutputTag<TmpTable>(bean.getMainTable()) { > }; > sideOutStreamMap.put(bean.getMainTable(), app); > } > > RowToNumberProcessFunction rowToNumberProcessFunction = new > RowToNumberProcessFunction(sideOutStreamMap, lists); > SingleOutputStreamOperator<TmpTable> process = > kafkaSource.process(rowToNumberProcessFunction); > > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, > settings, new TableConfig()); > //设置checkpoint > tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", > "10 s"); > > for (RowToColumnBean bean : lists) { > DataStream<TmpTable> dataStream = > process.getSideOutput(sideOutStreamMap.get(bean.getMainTable())); > > String mainTable = bean.getMainTable().split(" > ")[0].split("\\.")[1].toLowerCase(); > > //Table tmpTable = tableEnv.fromDataStream(dataStream, > StrUtil.list2Str(bean.getQueryColumns())); > > tableEnv.createTemporaryView(mainTable, dataStream); > > String joinTable = mainTable + "_join"; > tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" + > "rowkey STRING,\n" + > "info ROW<formid STRING>,\n" + > "PRIMARY KEY (rowkey) NOT ENFORCED\n" + > ") WITH (\n" + > "'connector' = 'hbase-2.2',\n" + > "'table-name' = > 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" + > "'zookeeper.quorum' = '192.168.0.115:2181',\n" + > "'zookeeper.znode.parent' = '/hbase'\n" + > ")"); > > > //查询数据 > //Table table = tableEnv.sqlQuery("select b.* from tmp a left join > dformfiled b on a.key = b.rowkey"); > Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a > left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' > where b.rowkey is not null"); > > TableSchema schema = table.getSchema(); > schema.getTableColumns().forEach(column -> { > > System.err.println(column.asSummaryString()); > }); > > DataStream<Tuple2<Boolean, Row>> tuple2DataStream = > tableEnv.toRetractStream(table, Row.class); > tuple2DataStream.print(mainTable); > dataStream.print(mainTable); > } > > > xia_...@163.com >