Re: flink sql任务提交,sql一直只执行一个

2021-01-19 文章 花乞丐
目前我所了解的Flink SQL 触发任务提交的是execcuteSql()中的insert into
以及StatementSet.execute()。我之前将两个insert into
分为两个executeSql进行提交,结果结果出现上述效果,修改成StatementSet.execute()之后得到了想要的效果,不太清楚这两种有什么区别?暂记录一下,待以后分析。小白学Flink真是好多坑

package com.zallsteel.flink.app.log;


import com.google.gson.Gson;
import com.zallsteel.flink.utils.ConfigUtils;


import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;


import java.text.ParseException;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

/**
 * @author Jackie Zhu
 * @time 2021-01-13 16:50:18
 * @desc 测试MySQLCDC to Hive
flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.zallsteel.flink.app.log.MySQLCDC2HiveApp \
-d \
/opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar
 */
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(6);

env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
"/etc/hive/conf",
"/etc/hadoop/conf",
"3.1.2"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
"  ) WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'hdp-xxx-dev-node01',\n" +
"'port' = '3306',\n" +
"'username' = 'xxx',\n" +
"'password' = '',\n" +
"'database-name' = 'cdc_test',\n" +
"'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +

flink sql任务提交,sql一直只执行一个

2021-01-18 文章 花乞丐
现在我使用flink cdc
读取mysql的binlog,然后发送至kafak,使用flink读取kafka消息,最后写入hive中,但是现在我在向yarn提交代码的时候,发现提交了两个job,但是,两个job执行的都是insert
into kafka.order_info;一直不执行insert into
ods.order_info;程序目前也没有任何报错!代码如下,是我提交job的姿势不对吗,还是什么其他的问题?提交命令:flink run -m
yarn-client -ynm mysql-cdc-2-hive -ys 3 -yjm 4g -ytm 8g -c
com.zallsteel.flink.app.log.MySQLCDC2HiveApp -d
/opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar

package com.zallsteel.flink.app.log;


import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.text.ParseException;
import java.time.Duration;
import java.util.Properties;

/**
 *
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
 */
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(6);

env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(
catalogName,
"default",
"/etc/hive/conf"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
"  ) WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'hdp-xxx-dev-node01',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'phkC4DE4dM28$PUD',\n" +
"'database-name' = 'cdc_test',\n" +
"'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +