Re: flink sql任务提交,sql一直只执行一个
目前我所了解的Flink SQL 触发任务提交的是execcuteSql()中的insert into 以及StatementSet.execute()。我之前将两个insert into 分为两个executeSql进行提交,结果结果出现上述效果,修改成StatementSet.execute()之后得到了想要的效果,不太清楚这两种有什么区别?暂记录一下,待以后分析。小白学Flink真是好多坑 package com.zallsteel.flink.app.log; import com.google.gson.Gson; import com.zallsteel.flink.utils.ConfigUtils; import com.google.gson.Gson; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.utils.ConfigUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CompletableFuture; /** * @author Jackie Zhu * @time 2021-01-13 16:50:18 * @desc 测试MySQLCDC to Hive flink run \ -m yarn-cluster \ -ys 2 \ -yjm 2g \ -ytm 4g \ -c com.zallsteel.flink.app.log.MySQLCDC2HiveApp \ -d \ /opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(6); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "default", "/etc/hive/conf", "/etc/hadoop/conf", "3.1.2" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + "'connector' = 'mysql-cdc',\n" + "'hostname' = 'hdp-xxx-dev-node01',\n" + "'port' = '3306',\n" + "'username' = 'xxx',\n" + "'password' = '',\n" + "'database-name' = 'cdc_test',\n" + "'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" +
flink sql任务提交,sql一直只执行一个
现在我使用flink cdc 读取mysql的binlog,然后发送至kafak,使用flink读取kafka消息,最后写入hive中,但是现在我在向yarn提交代码的时候,发现提交了两个job,但是,两个job执行的都是insert into kafka.order_info;一直不执行insert into ods.order_info;程序目前也没有任何报错!代码如下,是我提交job的姿势不对吗,还是什么其他的问题?提交命令:flink run -m yarn-client -ynm mysql-cdc-2-hive -ys 3 -yjm 4g -ytm 8g -c com.zallsteel.flink.app.log.MySQLCDC2HiveApp -d /opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar package com.zallsteel.flink.app.log; import com.google.gson.Gson; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.utils.ConfigUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Properties; /** * flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(6); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog( catalogName, "default", "/etc/hive/conf" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + "'connector' = 'mysql-cdc',\n" + "'hostname' = 'hdp-xxx-dev-node01',\n" + "'port' = '3306',\n" + "'username' = 'root',\n" + "'password' = 'phkC4DE4dM28$PUD',\n" + "'database-name' = 'cdc_test',\n" + "'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + ") WITH (\n" + "'connector' = 'kafka',\n" +