Hi: 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
DataStream<String> kafkaSource = env.addSource(source); Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>(); for (RowToColumnBean bean : lists) { OutputTag<TmpTable> app = new OutputTag<TmpTable>(bean.getMainTable()) { }; sideOutStreamMap.put(bean.getMainTable(), app); } RowToNumberProcessFunction rowToNumberProcessFunction = new RowToNumberProcessFunction(sideOutStreamMap, lists); SingleOutputStreamOperator<TmpTable> process = kafkaSource.process(rowToNumberProcessFunction); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig()); //设置checkpoint tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", "10 s"); for (RowToColumnBean bean : lists) { DataStream<TmpTable> dataStream = process.getSideOutput(sideOutStreamMap.get(bean.getMainTable())); String mainTable = bean.getMainTable().split(" ")[0].split("\\.")[1].toLowerCase(); //Table tmpTable = tableEnv.fromDataStream(dataStream, StrUtil.list2Str(bean.getQueryColumns())); tableEnv.createTemporaryView(mainTable, dataStream); String joinTable = mainTable + "_join"; tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" + "rowkey STRING,\n" + "info ROW<formid STRING>,\n" + "PRIMARY KEY (rowkey) NOT ENFORCED\n" + ") WITH (\n" + "'connector' = 'hbase-2.2',\n" + "'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" + "'zookeeper.quorum' = '192.168.0.115:2181',\n" + "'zookeeper.znode.parent' = '/hbase'\n" + ")"); //查询数据 //Table table = tableEnv.sqlQuery("select b.* from tmp a left join dformfiled b on a.key = b.rowkey"); Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' where b.rowkey is not null"); TableSchema schema = table.getSchema(); schema.getTableColumns().forEach(column -> { System.err.println(column.asSummaryString()); }); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(mainTable); dataStream.print(mainTable); } xia_...@163.com