Hi:
    有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下

DataStream<String> kafkaSource = env.addSource(source);
Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>();
for (RowToColumnBean bean : lists) {
    OutputTag<TmpTable> app = new OutputTag<TmpTable>(bean.getMainTable()) {
    };
    sideOutStreamMap.put(bean.getMainTable(), app);
}

RowToNumberProcessFunction rowToNumberProcessFunction = new 
RowToNumberProcessFunction(sideOutStreamMap, lists);
SingleOutputStreamOperator<TmpTable> process = 
kafkaSource.process(rowToNumberProcessFunction);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();

StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
settings, new TableConfig());
//设置checkpoint
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
 "10 s");

for (RowToColumnBean bean : lists) {
    DataStream<TmpTable> dataStream = 
process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));

    String mainTable = bean.getMainTable().split(" 
")[0].split("\\.")[1].toLowerCase();

    //Table tmpTable = tableEnv.fromDataStream(dataStream, 
StrUtil.list2Str(bean.getQueryColumns()));

    tableEnv.createTemporaryView(mainTable, dataStream);

    String joinTable = mainTable + "_join";
    tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
            "rowkey STRING,\n" +
            "info ROW<formid STRING>,\n" +
            "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
            ") WITH (\n" +
            "'connector' = 'hbase-2.2',\n" +
            "'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" 
+
            "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
            "'zookeeper.znode.parent' = '/hbase'\n" +
            ")");


    //查询数据
    //Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
dformfiled b on a.key = b.rowkey");
    Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
where b.rowkey is not null");

    TableSchema schema = table.getSchema();
    schema.getTableColumns().forEach(column -> {

        System.err.println(column.asSummaryString());
    });

    DataStream<Tuple2<Boolean, Row>> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
    tuple2DataStream.print(mainTable);
    dataStream.print(mainTable);
}


xia_...@163.com

回复