目前我在本地执行Flink程序是可以将kafka中的数据消费到,而且可以成功写入到hive中,但是,当我提交任务到yarn之后,从Flink Web UI看所有的sink都是 no watermark的状态,但是去查看hdfs的文件,是成功写入数据的,但是没有提交分区到metastore和提交success文件,所以也就是水印没有作用,但是在本地可以的,怎么在yarn反而不行了! <http://apache-flink.147419.n8.nabble.com/file/t1257/%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20210128151439.png> 代码如下所示,第一次使用Flink,是我使用的姿势不对吗: package com.xxxxx.flink.app.incr;
import com.alibaba.otter.canal.protocol.FlatMessage; import com.xxxxx.flink.contranst.TopicPattern; import com.xxxxx.flink.executions.TradeOrderExecutions; import com.xxxxx.flink.function.RowsFlatMapFunction; import com.xxxxx.flink.schema.FlatMessageSchema; import com.xxxxx.flink.utils.ConfigUtils; import com.xxxxx.flink.utils.TableResolveUtils; import com.xxxxx.flink.watermark.RowWatermarkAssigner; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import java.time.Duration; /** flink run \ -m yarn-cluster \ -ys 2 \ -yjm 2g \ -ytm 4g \ -c com.xxxxx.flink.app.incr.TradeOrderBinlogResolveApp \ -d \ /opt/tools/flink-1.12.0/xxxxx-realtime-etl-1.0-SNAPSHOT.jar */ public class TradeOrderBinlogResolveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(8); //设置checkpoint env.enableCheckpointing(60000); // 设置水印生产的时间间隔 env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "default", ConfigUtils.HIVE_CONF_DIR, ConfigUtils.HADOOP_CONF_DIR, ConfigUtils.HIVE_VERSION ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //获取表格的schema信息 RowTypeInfo tradeOrderTypes = TableResolveUtils.getRowTypeinformations("ods.trade_order_incr",tableEnv); RowTypeInfo tradeOrderItemTypes = TableResolveUtils.getRowTypeinformations("ods.trade_order_item_incr",tableEnv); RowTypeInfo tradeRealDeliveryTypes = TableResolveUtils.getRowTypeinformations("ods.trade_real_delivery_incr",tableEnv); RowTypeInfo tradeSteelItemTypes = TableResolveUtils.getRowTypeinformations("ods.trade_steel_item_incr",tableEnv); //构建kafka消费者,消费非资金业务topic FlinkKafkaConsumerBase<FlatMessage> messages = new FlinkKafkaConsumer<>(TopicPattern.TRADE_PATTERN, new FlatMessageSchema(), ConfigUtils.getKafkaConfig()) .setStartFromEarliest(); //给每一条增加水印 FlinkKafkaConsumerBase<FlatMessage> messagesWaters = messages.assignTimestampsAndWatermarks( WatermarkStrategy.<FlatMessage>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner( new SerializableTimestampAssigner<FlatMessage>() { @Override public long extractTimestamp(FlatMessage element, long recordTimestamp) { return element.getEs(); } } ) ); // 添加数据源 DataStreamSource<FlatMessage> messageSources = env.addSource(messagesWaters); // kafka消息进行转换,trade_order,并按照制定类型转换成Row对象 SingleOutputStreamOperator<Row> tradeOrder = messageSources.filter(x -> "trade_order".equalsIgnoreCase(x.getTable())) .flatMap(new RowsFlatMapFunction(), tradeOrderTypes); // trade_order_item SingleOutputStreamOperator<Row> tradeOrderItem = messageSources.filter(x -> "trade_order_item".equalsIgnoreCase(x.getTable())) .flatMap(new RowsFlatMapFunction(), tradeOrderItemTypes); //trade_real_delivery SingleOutputStreamOperator<Row> tradeRealDelivery = messageSources.filter(x -> "trade_real_delivery".equalsIgnoreCase(x.getTable())) .flatMap(new RowsFlatMapFunction(), tradeRealDeliveryTypes); SingleOutputStreamOperator<Row> tradeSteelItem = messageSources.filter(x -> "trade_steel_item".equalsIgnoreCase(x.getTable())) .flatMap(new RowsFlatMapFunction(), tradeSteelItemTypes); // 将流注册为临时表 tableEnv.createTemporaryView("trade_order_tmp",tradeOrder); tableEnv.createTemporaryView("trade_order_item_tmp",tradeOrderItem); tableEnv.createTemporaryView("trade_real_delivery_tmp",tradeRealDelivery); tableEnv.createTemporaryView("trade_steel_item_tmp",tradeSteelItem); // 将临时表中的数据存入Hive,拼接多个insert into,一起执行 StatementSet statementSql = TradeOrderExecutions.getStatementSql(tableEnv); // 执行sql TableResult execute = statementSql.execute(); try { // 开启流应用 env.execute(); } catch (Exception e) { e.printStackTrace(); } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/