退订

2023-09-10 文章
退订




退订

2023-08-27 文章
退订




使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-01-21 文章
HI!
这边做测试时遇到一个问题:
在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid 
DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
") WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
"'table-name' = 'tm_dealers'," +
"'driver' = 'com.mysql.cj.jdbc.Driver'," +
"'username' = 'root'," +
"'password' = 'Cdh2020:1'," +
"'lookup.cache.max-rows' = '500',"+
"'lookup.cache.ttl' = '1800s',"+
"'sink.buffer-flush.interval' = '60s'"+
")");


我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED 
instead. Aborting checkpoint.


进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?


感谢大佬指导一下,拜谢!
| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章
Hi
 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢


flink1.12 yarn-per-job 模式
jobID:fea3d87f138ef4c260ffe9324acc0e51  
yarnID : application_1610788069646_0021 
执行的命令如下:
./bin/flink savepoint -t yarn-per-job -D 
yarn.application.id=application_1610788069646_0021 
fea3d87f138ef4c260ffe9324acc0e51


报错如下:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
fea3d87f138ef4c260ffe9324acc0e51 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


祝好!
| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

设置状态存储位置后,job运行起来后找不到状态数据

2021-01-20 文章
Hi all
小弟遇到个问题期望大佬解答解答:
通过 env.setStateBackend(new 
RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,


flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 
“/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?


public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(6);
env.getConfig().setAutoWatermarkInterval(200);
env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);

bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofMinutes(5));

Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");

| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

flinksql 将计算结果写入到hbase数据不一致

2021-01-19 文章
 +
"CAST(COUNT(CASE WHEN C.data_source = '20' OR C.data_source = '100' THEN 1 END 
)  AS STRING) AS QCZJWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '40' OR C.data_source = '30' THEN 1 END 
)  AS STRING) AS YCWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '10' OR C.data_source = '80' THEN 1 END 
)  AS STRING) AS TPYWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '60' OR C.data_source = '50' THEN 1 END 
)  AS STRING) AS AKWXZBXS," +
"CAST(COUNT( CASE WHEN C.data_source = '130' OR C.data_source = '140' THEN 1 
END )  AS STRING) AS DCDWXZBXS " +
"FROM " +
"new_clue_list_cdc AS C " +
"WHERE " +
"C.fail_apply_time IS NOT NULL AND C.clue_fail_type = 20 GROUP BY 
C.entity_code) AS t2 " +
"ON t1.dealer_code = t2.entity_code  WHERE t1.is_valid=12781001)");

/**
 * 汇总层Hbase表 网销线索按来源渠道维度汇总表
*/
bsTableEnv.executeSql("CREATE TABLE DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE (" +
"ROWKEY VARCHAR," +
"F1 ROW(ENTITY_CODE VARCHAR,ZXS VARCHAR, QCZJXS VARCHAR, QCZJSBXS VARCHAR, 
QCZJZXS VARCHAR,  YCXS VARCHAR," +
" YCSBXS VARCHAR, YCZXS VARCHAR, TPYXS VARCHAR, TPYSBXS VARCHAR, TPYZXS 
VARCHAR,  AKXS VARCHAR, AKSBXS VARCHAR," +
" AKZXS VARCHAR,  DCDXS VARCHAR, DCDSBXS VARCHAR, DCDZXS VARCHAR, SUM_TIME 
VARCHAR)," +
"PRIMARY KEY (ROWKEY) NOT ENFORCED " +
") WITH (" +
"'connector' = 'hbase-2.2'," +
"'table-name' = 'DWM:DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE'," +
"'zookeeper.quorum' = '10.0.15.83:2181'," +
"'zookeeper.znode.parent' = '/hbase'" +
// "'sink.buffer-flush.max-size' = '3mb',"+
   // "'sink.buffer-flush.max-rows' = '1000',"+
   // "'sink.buffer-flush.interval' = '3s'"+
")");

stmtSet.addInsertSql(
"INSERT INTO DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE " +
"SELECT " +
"ROWKEY,ROW(dealer_code,ZXS,QCZJXS,QCZJSBXS,QCZJZXS,YCXS,YCSBXS,YCZXS,TPYXS,TPYSBXS,"
 +
"TPYZXS,AKXS,AKSBXS,AKZXS,DCDXS,DCDSBXS,DCDZXS,SUM_TIME) as F1 " +
"FROM " +
"(SELECT CONCAT_WS('',SUBSTRING(MD5(t1.dealer_code) FROM 0  FOR 6 
),t1.dealer_code,FROM_UNIXTIME(UNIX_TIMESTAMP(),'MMDD')) AS ROWKEY," +
"t1.dealer_code," +
"IF(t2.ZXS IS NULL, '0',t2.ZXS) AS ZXS,"+
"IF(t2.QCZJXS IS NULL, '0',t2.QCZJXS) AS QCZJXS,"+
"IF(t2.QCZJSBXS IS NULL, '0',t2.QCZJSBXS) AS QCZJSBXS,"+
"IF(t2.QCZJZXS IS NULL, '0',t2.QCZJZXS) AS QCZJZXS,"+
"IF(t2.YCXS IS NULL, '0',t2.YCXS) AS YCXS,"+
"IF(t2.YCSBXS IS NULL, '0',t2.YCSBXS) AS YCSBXS,"+
"IF(t2.YCZXS IS NULL, '0',t2.YCZXS) AS YCZXS,"+
"IF(t2.TPYXS IS NULL, '0',t2.TPYXS) AS TPYXS,"+
"IF(t2.TPYSBXS IS NULL, '0',t2.TPYSBXS) AS TPYSBXS,"+
"IF(t2.TPYZXS IS NULL, '0',t2.TPYZXS) AS TPYZXS,"+
"IF(t2.AKXS IS NULL, '0',t2.AKXS) AS AKXS,"+
"IF(t2.AKSBXS IS NULL, '0',t2.AKSBXS) AS AKSBXS,"+
"IF(t2.AKZXS IS NULL, '0',t2.AKZXS) AS AKZXS,"+
"IF(t2.DCDXS IS NULL, '0',t2.DCDXS) AS DCDXS,"+
"IF(t2.DCDSBXS IS NULL, '0',t2.DCDSBXS) AS DCDSBXS,"+
"IF(t2.DCDZXS IS NULL, '0',t2.DCDZXS) AS DCDZXS,"+
"FROM_UNIXTIME(UNIX_TIMESTAMP()) AS SUM_TIME " +
"FROM " +
"tm_dealers AS t1 " +
"LEFT JOIN " +
"(SELECT " +
"t.entity_code," +
"CAST(count(*) AS STRING) AS ZXS," +
"CAST(SUM(t.QCZJ_XS_LM) AS STRING) AS QCZJXS," +
"CAST(SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJSBXS," +
"CAST(SUM(t.QCZJ_XS_LM)+SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJZXS,"+
"CAST(SUM(t.YC_XS_LM) AS STRING) AS YCXS," +
"CAST(SUM(t.YC_SB_LM) AS STRING) AS YCSBXS," +
"CAST(SUM(t.YC_XS_LM)+SUM(t.YC_SB_LM) AS STRING) AS YCZXS,"+
"CAST(SUM(t.TPY_XS_LM) AS STRING) AS TPYXS," +
"CAST(SUM(t.TPY_SB_LM) AS STRING) AS TPYSBXS," +
"CAST(SUM(t.TPY_XS_LM)+SUM(t.TPY_SB_LM) AS STRING) AS TPYZXS,"+
"CAST(SUM(t.AK_XS_LM) AS STRING) AS AKXS," +
"CAST(SUM(t.AK_SB_LM) AS STRING) AS AKSBXS," +
"CAST(SUM(t.AK_XS_LM)+SUM(t.AK_SB_LM) AS STRING) AS AKZXS,"+
"CAST(SUM(t.DCD_XS_LM) AS STRING) AS DCDXS," +
"CAST(SUM(t.DCD_SB_LM) AS STRING) AS DCDSBXS," +
"CAST(SUM(t.DCD_XS_LM)+SUM(t.DCD_SB_LM) AS STRING) AS DCDZXS "+
"FROM " +
"(SELECT " +
"C.entity_code," +
"COUNT(*) AS NUM_2_LM," +
"COUNT( CASE WHEN C.data_source = '20' THEN 1 END ) AS QCZJ_XS_LM," +
"COUNT( CASE WHEN C.data_source = '100' THEN 1 END ) AS QCZJ_SB_LM," +
"COUNT( CASE WHEN C.data_source = '40' THEN 1 END ) AS YC_XS_LM," +
"COUNT( CASE WHEN C.data_source = '30' THEN 1 END ) AS YC_SB_LM," +
"COUNT( CASE WHEN C.data_source = '10' THEN 1 END ) AS TPY_XS_LM," +
"COUNT( CASE WHEN C.data_source = '80' THEN 1 END ) AS TPY_SB_LM," +
"COUNT( CASE WHEN C.data_source = '60' THEN 1 END ) AS AK_XS_LM," +
"COUNT( CASE WHEN C.data_source = '50' THEN 1 END ) AS AK_SB_LM," +
"COUNT( CASE WHEN C.data_source = '130' THEN 1 END ) AS DCD_XS_LM," +
"COUNT( CASE WHEN C.data_source = '140' THEN 1 END ) AS DCD_SB_LM  " +
"FROM " +
"new_clue_list_cdc AS C  " +
"WHERE " +
"((C.clue_level IN ('13101007','13101006')) OR C.clue_level IN 
('13101001','13101002','13101003','13101004')) " +
"GROUP BY " +
"C.entity_code,C.customer_no " +
") AS t " +
"GROUP BY t.entity_code ) AS t2 ON t1.dealer_code = t2.entity_code  WHERE 
t1.is_valid=12781001)");

stmtSet.execute();

}

}
| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章
还有一个问题,我已经有一个job在运行了,当我再次提交一个job运行的时候输出下面这些信息,去yarn查看发现job并未启动起来,有遇到过这个现象吗?


[root@cdh1 flink-1.12.0]# ./bin/flink run -d -t yarn-per-job  -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180 
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/opt/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[root@cdh1 flink-1.12.0]#




| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:42,刘海 写道:
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system &#x

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers t

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章
你好 
 根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zo

回复:yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 09:15,刘海 写道:
Hi  Dear All,
   请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:


#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1


# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m


# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m


# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme


#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled 
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==
# Advanced
#==


# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
# /data1/tmp:/dat

yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章
tten to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp


# The classloading resolve order. Possible values are 'child-first' (Flink's 
default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first


# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
# 
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb




#==
# YARN Configuration
#==
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN
 Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% 
-DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink


#==
# HistoryServer
#==
heartbeat.timeout: 180




请教的问题:
 
通过   ./bin/flink run \
-d -t yarn-per-job \ 
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=180   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 
里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?




祝好!
刘海






| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

Re: Row function cannot have column reference through table alias

2021-01-10 文章
使用ROW里面的 表别名.字段名 会出现解析错误,我之前使用hbase也有这个问题,我一般是在查询里面包一层子查询


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
On 1/11/2021 11:04,马阳阳 wrote:
We have a sql that compose a row with a table’s columns. The simplified sql is 
like:
INSERT INTO flink_log_sink
SELECT
b.id,
Row(b.app_id, b.message)
FROM flink_log_source a
join flink_log_side b
on a.id = b.id;


When we submit the sql to Flink, the sql cannot be parsed, with the following 
error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at 
cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at 
cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 15 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 17 more


Is this a bug or the expected behavior? If this is the expected behavior, what 
can we do to avoid it?


PS:
I tried to create a view to represent the join result,  

回复: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2021-01-03 文章
这个应该和内存有关,我之前试过,存储的状态无限增长,导致运行几分钟后任务结束,并抛出异常,可以尝试一下加大内存和清理状态


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月4日 11:35,咿咿呀呀<201782...@qq.com> 写道:
我按照您这个修改了,跟我之前的也是一样的。能运行的通,输出的结果也是正确的,现在最大的问题是——运行一段时间后(3分钟左右)就出现了OSError:
Expected IPC message of type schema but got record batch这个错误



--
Sent from: http://apache-flink.147419.n8.nabble.com/