Re: Flink checkpoint 速度很慢 问题排查

2021-06-07 文章 Jacob
嗯嗯
明白了,感谢大神最近的指导!



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-07 文章 Jacob
@nobleyd
谢谢大神指导,前两天休息没看邮件,才回复,抱歉

我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常

我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。

public class AsyncProcessFunction extends RichAsyncFunction, List> {

 private transient ExecutorService executorpool;

@Override
public void open(Configuration parameters) throws Exception {
  executorpool= Executors.newFixedThreadPool(80);
}

@Override
public void asyncInvoke(Map message,
ResultFuture> resultFuture){
executorpool.submit(()->{
  // 处理逻辑
  ..
  resultFuture.complete(Collections.singletonList(...));
});
}
}



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 文章 Jacob
thanks,

我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis

我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/>
  



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 文章 Jacob
嗯嗯 你的描述是对的,job的执行过程大致就是如此


我明白你意思了

谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容?





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 文章 Jacob
@nobleyd 谢谢回复

你任务A中的redis和hbase是异步还是同步访问,--- 同步

你估计用的是对齐检查点是吧? ---是的


同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了

检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快


消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@JasonLee 谢谢回复

A job 的背压情况如下图

<http://apache-flink.147419.n8.nabble.com/file/t1162/backpressure.png> 

我清楚job处理数据速度的确赶不上消息的生产速度这一事实,但暂时想不到一个合理的解决办法,并行度都已经设置的比较大了(从等于topic分区数量已经调整为大于partition数量了)。

我把各个task的并行度设置是一样的,让他们链在一个task上,从而优化线程切换的性能
其中 Map算子是最耗时的,所有的逻辑和数据加工都在这个Map算子,前后两个Flat Map
都是简单的将List数据扁平化而已,没有什么耗时操作。开始它们的并行度我设25(topic01
partition数量、消息速率是:1000~2000条/s),后面直接改成80,但并没有明显效果。





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
Dear all,

我有一个两个Flink Job A和B

A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02

其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。

B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc
file,state大小几百兆,但耗时是秒级别。

我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。

请各位指教



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
问题已解决

需要在FLink home的lib中引入kafka connector jar包



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
Thank you for your reply!

您所说的kafka connector 是*flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
Thank you for your reply!

您所说的kafka connector 是* flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了 *flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 Jacob
nk-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
at com.newegg.flink.sqlkafka.Main.main(Main.java:66)
~[flink-0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_191-ojdkbuild]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_191-ojdkbuild]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_191-ojdkbuild]
at java.lang.reflect.Method.invoke(Method.java:498)
~[?:1.8.0_191-ojdkbuild]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.




-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 流批一体的实践

2021-05-10 文章 Jacob
Deal All,

*【业务场景】*

step1:Flink消费kafka,将消息进行加工处理,每半个小时要生成一次增量数据,然后将这半个小时的数据写到HDFS的一个orc文件
step2:将上一步flink
Job写到hdfs的文件load到hive表,然后和全量数据(hive表)进行Join,将Join后的结果根据某个字段进行group
by,再写入到几张结构相同的hive表。执行完后将增量文件删除

*【存在的问题】*

1. 上面的两步分别要维护两套代码,开发和维护都比较麻烦

2.
第二步依赖于hadoop做MapReduce,增量和全量Join速度较慢,且资源占用较大,而且当第二步运行时,如果没有资源,将无法在半个小时内生成Join结果,那么就没有及时删除增量文件,导致第一步Flink
job会连续生成多个增量文件,等到Join的MapReduce
job有资源后,需要将多个增量文件merge,再与全量Join,这样以来整个过程又被拖慢。
3. 时效性不好,本身是一个整体,被拆成两步,存在job累加,得等上一步flink job
hdfs写入完成了,下一个定时Job才能正常运行,让原有的实时性效果变差

【请教】

能否把上面两步合二为一?该场景应该也算是流批同时存在的业务,Flink 的“流批一体”,能否实现?

由于第一步中的Flink Job的执行环境是StreamExecutionEnvironment ,如果用Flink
SQL去运行增量和全量的Join,那就是另外的TableEnvironment了,我不太清楚怎么在同一个Job去构建这两种环境,以及,Join后要写入几张不同的hive表,必然会运行多个sql语句,我看到的一些demo
都是一次跑一个sql语句,那么这种需求是可以实现的吗?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink job消费kafka 失败,无法拿到offset值

2021-04-13 文章 Jacob
有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。

Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:

java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
Timeout of 6ms expired before the position for partition Test-topic-27
could be determined
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
6ms expired before the position for partition Test-topic-27 could be
determined

查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。

请指教 



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 写ORC失败

2021-03-30 文章 Jacob
使用Flink API消费kafka消息,写orc文件,报错如下
Caused by: org.apache.flink.util.SerializedThrowable
at java.lang.System.arraycopy(Native Method) ~[?:1.8.0_191-ojdkbuild]
at org.apache.hadoop.io.Text.set(Text.java:225) ~[test456.jar:?]
at 
org.apache.orc.impl.StringRedBlackTree.add(StringRedBlackTree.java:59)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StringTreeWriter.writeBatch(StringTreeWriter.java:70)
~[test456.jar:?]
at
org.apache.orc.impl.writer.MapTreeWriter.writeBatch(MapTreeWriter.java:104)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
~[test456.jar:?]
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
~[test456.jar:?]
at 
org.apache.flink.orc.writer.OrcBulkWriter.flush(OrcBulkWriter.java:66)
~[test456.jar:?]
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:59)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:226)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:259)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:240)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:245)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:236)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:86)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:415)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 13 more


大概原因应该是写一个map类型的数据时候出错,但不知道具体是哪个地方的错误

看到一个相似的错误
https://stackoverflow.com/questions/55246512/error-writing-to-orcnewoutputformat-using-mapr-multipleoutputs

不太清楚这个错误时什么原因所致
已知数据不为空,不为null



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 消费kafka ,写ORC文件

2021-03-24 文章 Jacob
谢谢回复

简单实现了一下BucketAssigner,可以实现需求


@Override
public String getBucketId(Map element, Context context) 
{
if(context.timestamp() - context.currentProcessingTime() < 0) {
return "dt="+context.timestamp();
}
return null;
}



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 消费kafka ,写ORC文件

2021-03-22 文章 Jacob
【现状如下】

Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
据了解,flink写orc的桶分配策略[1],有两种:

一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]

test/realtime/
└── 2021-03-23--07
├── part-0-0.orc
├── part-0-1.orc
└── 2021-03-23--08
├── part-0-0.orc
├── part-0-1.orc

一种是将所有部分文件放在一个目录下:

test/realtime/
├── part-0-0.orc
├── part-0-1.orc
├── part-0-2.orc
├── part-0-3.orc

【问题】

最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:

hive> show partitions table_demo;
OK
dt=161645580
dt=161645760
dt=161645940
dt=161646121
dt=161646301
Time taken: 0.134 seconds, Fetched: 5 row(s)

因此希望每个orc文件的所在目录名都是dt=`时间戳`的格式:

<http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png> 

用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。

不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了

[1].https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 文章 Jacob
谢谢大佬答疑。
我先尝试使用 YarnClusterDescriptor 这些类提交Job。看看后续使用情况 是否合适



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 文章 Jacob
谢谢提供思路,刚通过接口编程这个思路找到了一些文章和demo。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 文章 Jacob
谢谢回复!

这两天有事回复晚了抱歉。

我flink job是运行在hadoop集群的,即On Yarn模式。
根据您所说的 


1.[通过 FLINK 和 YARN 或 k8s
的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态],意思是可以通过相关API,去读一个jar包并提交Job吗?要提交到的集群也是通过配置参数传入代码里,是大概这样的一个过程吗?有相关的文档或者demo吗?我在网上一直找不到相关内容。


2. [Flink 本身有一个 Web
前端,可以支持你要的大部分功能],这个我清楚,也经常打开这个webUI查看日志,那如果和自己系统集成的话,是把这些页面以超链接的形式集成到系统里面吗,在系统dashboard中点某个按钮,跳转到flink
webui的某一个模块里?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink Job 如何集成到自己的系统,方便管理

2021-03-06 文章 Jacob
我们现在提交Flink Job 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。 


后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
  

上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。


不知道这有没有可能实现

请大佬提供些许思路!感谢



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-02 文章 Jacob
谢谢回复

看了数次checkpoint慢的情况,发现大多是async阶段耗时,如果是这样,那这应该是那个时刻网络原因导致的慢吧?
但是我还是觉得跟磁盘有一定关系



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-01 文章 Jacob
谢谢回复

我用的是filesystem,
相关配置如下:
  

state.backend: filesystem
state.checkpoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
state.savepoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
state.backend.incremental: false
state.backend.fs.memory-threshold: 1024
state.checkpoints.num-retained: 3
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 30 s



后面把上面配置注释掉,然后在代码中指定了checkpoint类型为内存,但速度还是很慢。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink checkpoint 速度慢问题请教

2021-02-26 文章 Jacob
Hi All,

生产环境有一个Job,在hadoopA集群运行稳定正常,checkpoint速度也很快(checkpoint间隔时间是30s,每一个checkpoint大小几十kb,做一次checkpoint耗时为毫秒级别)

相同的job,代码没有任何变化,将job迁移到另一个hadoopB集群,checkpoint就非常慢,做一次耗时10几分钟,导致job运行瘫痪,大部分时间和资源都在做checkpoint,而没有处理我们的业务逻辑。
  

 
目前我所了解到的这两个hadoop集群唯一不同的是,A集群是SSD,B集群机器是机械硬盘。


job的checkpoint存储是在hdfs,是否是因为磁盘性能问题,导致B集群checkpoint速度过慢呢?是否应该使用内存作为checkpoint存储?请指教。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink job与自己系统平台的一体化集成

2021-02-02 文章 Jacob
有一个模糊的需求,不知道是否合理

目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink
run-application -t yarn-application ... 的形式去提交Job。

现在我们有自研的一个关于数据处理平台,flink
job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?

我想着如果在我们自己的平台上提交job,那是不是应该先把flink客户端先集成到我们的系统中呢,否则job如何被启动运行呢?

需求比较模糊,各位大佬见谅。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Flink 并行度问题

2021-01-23 文章 Jacob
我明白了。。。感谢!!(最近很容易陷入这些低级的错误认知中)。越想越觉得这个问题很傻   23



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Flink 并行度问题

2021-01-23 文章 Jacob
谢谢回复~

在我的理解中,并行度数量不应该超过CPU的数量的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 并行度问题

2021-01-23 文章 Jacob
谢谢回复。

你所说的并行度随意设置,我不清楚是什么意思


我在on yarn模式提交job时,比如设置job并行度是25,那么在hadoop yarn portal中看到的VCores Used
就是25,也就是说它就是申请了25个CPU

可是在standalone模式下(3台机器,每台机器4个CPU),那我设置的并行度最大是不是就是12呢?超过这个数量就会报错,资源不够之类的错误提示。  

其实这样三台机器组成的flink集群,往往可申请的可用slot并不是12个,而是小于12个,是不是因为当下CPU剩余量不够,可申请到的slot才达不到12个?






-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink on yarn JDK 版本支持问题

2021-01-23 文章 Jacob
使用Flink1.11.2客户端 往hadoop集群提交job,报错如下:

LogType:jobmanager.err
Log Upload Time:Sat Jan 23 00:06:47 -0800 2021
LogLength:160
Log Contents:
Unrecognized VM option 'MaxMetaspaceSize=268435456'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

请问是因为hadoop集群jdk版本低的问题吗?


现在已知的是hadoop集群jdk版本为1.7


之前一直以为在flink配置文件中配置的*env.java.home*就应该是hadoop集群的java home,通过测试,发现并不是,这个java
home就是客户端(本地)所在机器的java home。这个java版本已经是1.8+,但提交job时,仍然报错,如上。



是因为hadoop集群的jdk低吗?如果升级了hadoop集群的jdk,那么在flink配置文件中的env.java.home
需要改成hadoop集群的java home吗?还是不用改变,依旧使用本地的java home路径?

这两个jdk对于启动一个flink job的作用是什么呀?( 配置的env.java.home和hadoop集群的java home)







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 并行度问题

2021-01-22 文章 Jacob
使用Flink以来,一直有一个问题困扰着。


Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。

比如Flink消费kafka
topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。


如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?

在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?




-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 如何保证多个sql 语句按顺序执行

2021-01-06 文章 Jacob
谢谢回复,

听起来是可以的
我先尝试一下这种方案



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.10 on yarn提交任务报错

2021-01-06 文章 Jacob
hi,

可以先做如下尝试:

export HADOOP_USER_NAME=your user

export HADOOP_CONF_DIR=your hadoop conf dir

export HADOOP_CLASSPATH=`/opt/app/hadoop/bin/hadoop classpath`



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 流批一体的应用

2021-01-05 文章 Jacob
Hi all

现有一个场景:

消费kafka消息,逐条处理加工每条kafka数据,每隔15分钟将不同数据写进hive表(多张表)
之后,对上面的多张表进行一系列join merge等操作写到新表,生成最终的数据。


这样的场景如果用Flink去处理,是不是需要启动两个flink job,一个处理流数据,一个处理批数据
因为两个执行环境不一样
流处理:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
批处理:
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

有没有可能让这两部分合二为一呢,放在同一个job执行?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL 如何保证多个sql 语句按顺序执行

2021-01-05 文章 Jacob
Dear All,在Flink SQL
job中,如果有多个sql语句,需要按顺序执行,即下一个sql的执行依赖上一个sql的执行结果。由于tableEnv.executeSql(sql)是*异步*提交的,那么如何保证多个sql是*顺序执行*?eg:在一个main函数中,有如下代码:String
sql1 = "";tableEnv.executeSql(sql1 );String sql2 =
"";tableEnv.executeSql(sql2 );问题:如何保证sql1先执行完成,再执行sql2



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL>查询的hive表数据全部为NULL

2021-01-05 文章 Jacob
谢谢回复

这个问题困扰了很久
已经解决
原因是写orc时候指定的字段名是column0、column1.、column33
而hive创建表的字段是实际字段的名字,两个不匹配,因此在flink sql中读不到
数据



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL查询ORC表结果全部为NULL

2021-01-04 文章 Jacob
Flink SQL> select * from table1 where dt='1609739880002';



table1是张orc表,有分区(dt是分区),在flink sql客户端查询表的结果全部为NULL,但select
count是可以查出数据条数。找了好几天的原因,实在不知道是什么原因了,求教!!!

Flink SQL> select * from table1 where dt='1609739880002';


<http://apache-flink.147419.n8.nabble.com/file/t1162/null.png> 





Flink SQL> select count(*) from `table1` where  dt='1609739880002';


<http://apache-flink.147419.n8.nabble.com/file/t1162/count.png> 







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL>查询的hive表数据全部为NULL

2021-01-03 文章 Jacob
Dear All,

Flink SQL>select * from table1;


在Flink客户端查询hive数据,结果每个字段全为NULL,但数据条数是对的,select
count是正确的,查具体数据就是NULL,不知何故?同样的查询在hive客户端是可以查到数据的


hive表时orc文件load的数据。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink sql执行insert into 的一些问题

2020-12-30 文章 Jacob
Dear All,

Flink SQL> insert into table1 select filed1,filed2,.filed10 from table2;

在flink sql
中执行类似上面的语句,在webui中看到很快就finished了,但数据并没有写进table1表中,查看log,也看不到什么报错。迷惑


还有,在使用select count(*) 查询表数据时,有的表能查到结果,有的就不显示结果,也没有报错。实在不知道什么原因了。。。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.2 替代MapReduce执行 Hive 语句 Full Join 非常慢

2020-12-29 文章 Jacob
Dear All,

近期在research Hive on Flink的一些特性,之前有一个Mapreduce的任务,语句【HQL】如下:

逻辑简单,全连接两个表(table1和table2),将结果写进新表table3。table1、table2和table3三张表结构相同,都有35个字段,join的时候用id作为连接,进行比较,如果table2的字段值不为null,或者不为空,就用table2的字段,反之用table1字段。最后把结果写进新表table3。


到目前为止,已经执行了17h,还没有结束,看数据流,好像快完了,不知道我的使用方法是否合理?

【HQL 】
insert into table3 select
if(t2.id is not null and t2.id <> '', t2.id, t1.id) as id
,if(t2.field2 is not null and t2.field2 <> '', t2.field2, t1.field2) as
field2
..
..
..
..
..
..
..
..
,if(t2.field35 is not null and field35.dt <> '', field35.dt , field35.dt )
as field35
from (
select * from table1 where (id is not null and id <> '')
) as t1 full join (
select * from table2 where (id is not null and id <> '')
) as t2 on (t1.id = t2.id)




代码如下:



public class FlinkHiveIntegration1 {

public static void main(String[] args) throws Exception {

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
  
String name = "myhive";
String database = "mydatabase";
String version = "1.1.0-cdh5.8.3";

HiveConf hiveConf = new HiveConf();

hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://***:9083");

hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"hdfs://nameservice1/user/hive/warehouse");


HiveCatalog hive = new HiveCatalogTest(name, database, hiveConf ,
version);

tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.useDatabase(database);
 
String HQL = HQL ;

tableEnv.getConfig().addConfiguration(new
Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 8));

tableEnv.executeSql(HQL );

}
}








<http://apache-flink.147419.n8.nabble.com/file/t1162/hiveonFlink.png> 



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2客户端 select * from table

2020-12-28 文章 Jacob
Dear All,


<http://apache-flink.147419.n8.nabble.com/file/t1162/FlinkSQL.png> 


在Flink SQL客户端中使用select * from table 语句查询表数据,结果只显示表头,没有数据,是什么原因。

<http://apache-flink.147419.n8.nabble.com/file/t1162/queryTable.png> 

在sql-client-defaults.yaml中已经指定catalog



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 操作hive 一些疑问

2020-12-26 文章 Jacob
Thanks!

还是决定暂时用两个job执行吧

一个job执行流处理生成数据

另一个job(使用flink的hive功能)执行批处理。

主要验证一下第二个job的使用相比MapReduce节省了多少资源



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 操作hive 一些疑问

2020-12-24 文章 Jacob
Hi,

谢谢回复

对,也可以这样理解,总体分为两部分,先处理流消息,每隔15min写进hive表。然后再做mapreduce处理上步15min的数据。
  
目前的现状是:
第一步用flink处理,第二步是一个定时job去处理上一步的数据。

改善计划:

想整合这两步,都使用flin处理,flink新版本对hive有支持,就不用再使用MapReduce了,现在就是不知道怎样平滑地在同一个Job中执行。




-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 操作hive 一些疑问

2020-12-22 文章 Jacob
Dear all,

我目前有个Flink job,执行完所以业务逻辑后生成了一些业务数据,然后将这些数据以ORC格式写到hdfs上,并调用hive api
将orc文件load到Hive表,至此flink job的工作结束。

后面,其他Java定时程序做Mapreduce,对上一步写进hive的数据进行后续操作。

现在升级了Flink版本,Flink可以直接操作hive,不再依赖于Mapreduce。

但这样一来,是不是需要两个flink job ,一个用来生成业务数据,一个用来操作hive 来处理这些业务数据

因为两个job的执行环境不一样,如果不操作hive,是这样的操作环境 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.execute("my job");
如果操作hive,就需要构造这样的操作的环境

   
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
..
tableEnv.executeSql(hql);

有没有什么通用的方案,让这两个job合二为一呢?我想要的效果时,当生成完业务数据后,直接操作hive,取代mapreduce的工作。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 连接Hive hiveConf路径配置

2020-12-18 文章 Jacob
谢谢回复

我想的也是如此,提交到hadoop集群,也应该能读到hive的conf dir

于是询问了DBA hive conf的路径为:/etc/hive/conf

我demo如下

public class FlinkHiveIntegration1 {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "nydatabase";
String hiveConfDir = "/etc/hive/conf";  // hive-site.xml路径
String version = "1.1.0-cdh5.8.3";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);

tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.useDatabase(defaultDatabase);

String hql = "select * from flink2hive_test";
tableEnv.executeSql(hql);
}
}

提交Job命令为:
   
./bin/flink run -m yarn-cluster -Djobmanager.memory.process.size=4096m
-Dtaskmanager.memory.process.size=4096m -Dtaskmanager.numberOfTaskSlots=1
-Dyarn.application.name="Flink_to_hive" -c
com.test.flink.function.FlinkHiveIntegration1
/opt/app/Flink/jar/1.11.2/flink-0.0.1-SNAPSHOT.jar --parallelism 1

报错如下:   
  
2020-12-18 23:41:33,188 FATAL org.apache.hadoop.conf.Configuration  
  
[] - error parsing conf file:/etc/hive/conf/hive-site.xml
java.io.FileNotFoundException: /etc/hive/conf/hive-site.xml (No such file or
directory)

...
...
..
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: java.io.FileNotFoundException: /etc/hive/conf/hive-site.xml
(No such file or directory)

提示文件不存在。





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 连接Hive hiveConf路径配置

2020-12-18 文章 Jacob
Dears,

flink在连接hive时,需配置hiveConf所在路径

我已经下载了集群中hive-site.xml文件,不知道应该放在哪个目录

Job部署模式是 on Yarn
,请问代码中hiveConf应该放在哪个目录下,应该不是我启动job所在的机器吧?因为job提交后运行在hadoop集群,是无法找到相关目录的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.12 job on yarn 集成hive时如何配置 hiveConf

2020-12-18 文章 Jacob
Dear all,

请问在flink在集成hive时候,需要配置hive的conf目录,我的job是on yarn提交的,那么如何配置这个hive conf路径呢?

String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "";  // hive-site.xml路径
String version = "1.1.0-cdh5.8.3";



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 读写Hive以及对hive的版本支持

2020-12-17 文章 Jacob
Dear All,

Flink.11.2操作hive时,对hive的版本支持是怎样的


看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
我的执行环境:

*Flink : 1.11.2*
*Haoop : 2.6.0-cdh5.8.3*
*Hive : 1.1.0-cdh5.8.3*
*Job运行方式 : on yarn*

同时对读写hive的demo,我不知道我写的是否正确:

public static void main(String[] args) throws Exception {

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "datafeed";
String hiveConfDir = "/opt/app/bigdata/hive-1.1.0-cdh5.8.3/conf"; 
// hive-site.xml路径
String version = "1.1.0-cdh5.8.3";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);

tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
String createDbSql = "INSERT INTO TABLE flink2hive_test VALUES
('55', \"333\", \"CHN\")";
tableEnv.sqlUpdate(createDbSql);  
}

这样的job提交到yarn会报错:

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapreduce.TaskAttemptContext

是缺少MapReduce的相关包吗?





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re: Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 Jacob
感觉像是jline和Scala 某些包冲突所致,Scala我不太了解,你可以从以下方面做些尝试

1.在pom.xml或者其他相关文件中, 排除hadoop(以及其他涉及到jline的依赖)依赖中的jline子依赖,单独引入jline的依赖
我当时遇到的问题是,hadoop-common出现了版本冲突,在某个依赖中包含hadoop-common包,我在该依赖中排除了hadoop-common,然后在单独引入hadoop-common依赖,问题得以解决。

2. 改变(升级)Scala的版本



Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Scala REPL YARN 运行模式报 NoSuchMethodError

2020-12-16 文章 Jacob
hi,
你的截图好像没有上传成功,通过你的描述,大概是NoSuchMethod之类的错误,我前几天在升级flink版本时候也遇到过类似问题,后来的解决方案是
导入hadoop classpath (export HADOOP_CLASSPATH=`hadoop
classpath`)解决的,如果没有解决你的问题,尝试把flink-shaded-hadoop-2-uber*-*.jar放在 flink/lib下面




Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
是的,我看到项目中有logback.xml配置,在pom中也有logback-classic依赖
  
ch.qos.logback
logback-classic
1.2.3


我移除这个依赖后,在UI中可以看到相关日志了!

谢谢!




Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
谢谢回复!

1. 在jobmanager.err中发现如下日志绑定,存在冲突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/24/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/32/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

这个多绑定会影响吗?

2. 该版本使用的配置如下:

env.java.home: /usr/java/jdk1.8.0_162
yarn.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162


jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 3072m
taskmanager.memory.process.size: 3072m
taskmanager.numberOfTaskSlots: 4

yarn.application-attempts: 10
state.backend: filesystem
state.checkpoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
state.savepoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
state.backend.incremental: false
state.backend.fs.memory-threshold: 1024
state.checkpoints.num-retained: 3

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 30 s

jobmanager.execution.failover-strategy: region


classloader.resolve-order: parent-first

3. job运行方式:on yarn

4. hadoop版本:2.6

Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 Jacob
<http://apache-flink.147419.n8.nabble.com/file/t1162/taskmanager.png> 

Hello everyone!

如上图所示,升级后的flink,为什么看不到taskmanager的日志了。在Stdout中能看自己代码中打的log,但flink自身的log以及springboot相关的log等,都无法看到,不知何因?升级后日志系统需要重新配置吗?


Thanks!
Jacob



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.12.0 写ORC文件,自定义文件名

2020-12-15 文章 Jacob
目前,Flink在写ORC文件时候,可通过OutputFileConfig类配置文件的前缀后缀:.withPartPrefix("prefix")、.withPartSuffix(".ext")

生成的文件格式为:part--

有没有可以完全自定义生成的文件名,比如:"dt=1608006781874",dt=时间戳的形式生成文件,目的是可以直接作为分区load在hive表。后期容易操作hive表。如果是flink默认的文件格式无法load在hive表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-14 文章 Jacob
谢谢回复!

这个文档我也有查看

前几日在flink1.9-1.12各个客户端测试提交job时候发现
对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop
classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-12 文章 Jacob
请问在升级flink版本的过程中,需要在flink/lib里面引入该包,但该包的版本号如何确定?
flink-shaded-hadoop-2-uber*-*



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.10.0 on yarn 提交job失败

2020-12-12 文章 Jacob
Hello, 请问在flink 1.10.0 on yarn提交job出现此问题是什么原因,hadoop
jar包依赖吗?该程序在1.10以下的版本均可运行,在1.10.0无法提交。

谢谢!


[jacob@hadoop001 bin]$ ./yarn logs -applicationId
application_1603495749855_57650
20/12/11 18:52:55 INFO client.RMProxy: Connecting to ResourceManager at
localhost:8032
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/app/hadoop_client/e11_backend/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/app/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/12/11 18:52:57 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable


Container: container_1603495749855_57650_02_01 on localhost
=
LogType:jobmanager.err
Log Upload Time:Fri Dec 11 18:49:21 -0800 2020
LogLength:2368
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/11/datafeed-website-filter_flink-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/17/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
at
org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys(ConfigUtil.java:54)
at
org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources(ConfigUtil.java:42)
at org.apache.hadoop.mapred.JobConf.(JobConf.java:119)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659)
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:91)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.(Groups.java:55)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 on yarn 提交job失败(附完整日志)

2020-12-10 文章 Jacob
大家好,有一问题困扰许久。

*Flink1.7.2升级到1.11.2 job无法提交*

【现象】

可以提交demo的job(WordCount),无法提交自定义的job,自己的job有操作Hadoop,读ORC,写hive等操作

提交自己的job,各种日志如下

 

 

 

 

Flink 配置文件:

env.java.home: /usr/java/jdk1.8.0_162
yarn.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_162
containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162


jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
jobmanager.execution.failover-strategy: region

taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 10
yarn.application-attempts: 10

state.backend: filesystem
state.checkpoints.dir: hdfs:///user/flink/flink-checkpoints
state.savepoints.dir: hdfs:///user/flink/flink-savepoints
state.backend.incremental: false
state.backend.fs.memory-threshold: 1024
state.checkpoints.num-retained: 3
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 30 s


项目POM文件



http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0



0.0.1-SNAPSHOT



UTF-8
1.8
1.8
1.11.2
2.11
0.10
1.1.0-cdh5.8.3




org.apache.orc
orc-core
1.5.4


org.apache.hadoop
hadoop-common


org.apache.hadoop
hadoop-hdfs


org.slf4j
slf4j-api




org.redisson
redisson
3.12.4


org.slf4j
slf4j-api




com.microsoft.sqlserver
mssql-jdbc
6.2.2.jre8


org.apache.hive
hive-exec
${hive.version}


*
*




ch.qos.logback
logback-classic
1.2.3


org.slf4j
slf4j-api




com.sun.mail
javax.mail
1.6.2


org.apache.flink
   
flink-connector-kafka-${kafka.version}_${scala.binary.version}
${flink.version}


org.apache.flink

flink-streaming-java_${scala.binary.version}
${flink.version}
provided


org.apache.flink

flink-connector-filesystem_${scala.binary.version}
${flink.version}


junit
junit
4.12
test



org.apache.hive
hive-jdbc
1.1.0


org.slf4j
slf4j-api





org.apache.hadoop
hadoop-common
2.6.0-cdh5.8.3


org.slf4j
slf4j-api







。。。





搞了很久了不知道原因了,开始压根提交不上去,看不到这个portal,后来发现Hadoop一些jar包冲突,已排除,见pom


求教,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 on yarn 提交job失败(附完整日志)

2020-12-10 文章 Jacob
大家好,有一问题困扰许久。*Flink1.7.2升级到1.11.2
job无法提交*【现象】可以提交demo的job(WordCount),无法提交自定义的job,自己的job有操作Hadoop,读ORC,写hive等操作提交自己的job,各种日志如下
 
 
 

Flink 配置文件:env.java.home:
/usr/java/jdk1.8.0_162yarn.taskmanager.env.JAVA_HOME:
/usr/java/jdk1.8.0_162containerized.master.env.JAVA_HOME:
/usr/java/jdk1.8.0_162containerized.taskmanager.env.JAVA_HOME:
/usr/java/jdk1.8.0_162jobmanager.rpc.address: localhostjobmanager.rpc.port:
6123jobmanager.memory.process.size:
1600mjobmanager.execution.failover-strategy:
regiontaskmanager.memory.process.size: 1728mtaskmanager.numberOfTaskSlots:
4parallelism.default: 10yarn.application-attempts: 10state.backend:
filesystemstate.checkpoints.dir:
hdfs:///user/flink/flink-checkpointsstate.savepoints.dir:
hdfs:///user/flink/flink-savepointsstate.backend.incremental:
falsestate.backend.fs.memory-threshold: 1024state.checkpoints.num-retained:
3restart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts:
1000restart-strategy.fixed-delay.delay: 30 s项目POM文件4.0.00.0.1-SNAPSHOTUTF-8   
1.81.81.11.22.110.101.1.0-cdh5.8.3  

org.apache.orc  orc-core1.5.4   

org.apache.hadoop   
hadoop-common   
org.apache.hadoop   
hadoop-hdfs 

org.slf4j   slf4j-api   

org.redisson   
redisson3.12.4  
org.slf4j   
slf4j-api  
com.microsoft.sqlservermssql-jdbc6.2.2.jre8 
  
org.apache.hivehive-exec${hive.version} 
  
**   
ch.qos.logbacklogback-classic1.2.3  

org.slf4j   slf4j-api   
   
com.sun.mailjavax.mail1.6.2   
org.apache.flink   
flink-connector-kafka-${kafka.version}_${scala.binary.version}   
${flink.version}org.apache.flink
flink-streaming-java_${scala.binary.version}${flink.version}
   
providedorg.apache.flink
flink-connector-filesystem_${scala.binary.version}  
${flink.version}   
junitjunit4.12test  
 
org.apache.hivehive-jdbc1.1.0   

org.slf4j   slf4j-api   
   
org.apache.hadoop   hadoop-common   
2.6.0-cdh5.8.3  
org.slf4j   slf4j-api   
。。。   
搞了很久了不知道原因了,开始压根提交不上去,看不到这个portal,后来发现Hadoop一些jar包冲突,已排除,见pom求教,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 文章 Jacob
*Thank you for your reply!*

日志以及pom文件如下
Container: container_1603495749855_55197_02_01 on hadoop01
=
LogType:jobmanager.err
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:802
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdd/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/33/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

LogType:jobmanager.log
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:980
Log Contents:
2020-12-09 17:03:31,918 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.require.client.cert;
Ignoring.
2020-12-09 17:03:31,931 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.keystores.factory.class;
Ignoring.
2020-12-09 17:03:31,931 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.server.conf;  Ignoring.
2020-12-09 17:03:31,932 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.client.conf;  Ignoring.

LogType:jobmanager.out
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:2188
Log Contents:
2020-12-09 17:03:36.375 [main] ERROR o.a.f.r.entrypoint.ClusterEntrypoint -
Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)
Caused by: java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
at
org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:73)
at
org.apache.flink.yarn.YarnResourceManager.(YarnResourceManager.java:155)
at
org.apache.flink.yarn.entrypoint.YarnResourceManagerFactory.createResourceManager(YarnResourceManagerFactory.java:76)
at
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61)
at
org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory.createResourceManager(ActiveResourceManagerFactory.java:58)
at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
... 2 common frames omitted



Container: container_1603495749855_55197_01_01 on hadoop01
=
LogType:jobmanager.err
Log Upload Time:Wed Dec 09 17:03:37 -0800 2020
LogLength:802
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl

Flink 1.11.2 on yarn 提交job失败

2020-12-09 文章 Jacob
*从flink1.7.2升级到1.11.2,job无法提交*
代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs
):

Container: container_1603495749855_55197_02_01 on hadoop01
=
LogType:jobmanager.err
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:802
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdd/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/33/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

LogType:jobmanager.log
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:980
Log Contents:
2020-12-09 17:03:31,918 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.require.client.cert; 
Ignoring.
2020-12-09 17:03:31,931 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.keystores.factory.class; 
Ignoring.
2020-12-09 17:03:31,931 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.server.conf;  Ignoring.
2020-12-09 17:03:31,932 WARN  org.apache.hadoop.conf.Configuration  
  
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.client.conf;  Ignoring.

LogType:jobmanager.out
Log Upload Time:Wed Dec 09 17:03:38 -0800 2020
LogLength:2188
Log Contents:
2020-12-09 17:03:36.375 [main] ERROR o.a.f.r.entrypoint.ClusterEntrypoint -
Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)
Caused by: java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
at
org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:73)
at
org.apache.flink.yarn.YarnResourceManager.(YarnResourceManager.java:155)
at
org.apache.flink.yarn.entrypoint.YarnResourceManagerFactory.createResourceManager(YarnResourceManagerFactory.java:76)
at
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61)
at
org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory.createResourceManager(ActiveResourceManagerFactory.java:58)
at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
... 2 common frames omitted



Container: container_1603495749855_55197_01_01 on hadoop01
=
LogType:jobmanager.err
Log Upload Time:Wed Dec 09 17:03:37 -0800 2020
LogLength:802
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache

Flink 1.11.2 on yarn 提交job失败

2020-12-09 文章 Jacob
*从flink1.7.2升级到1.11.2,job无法提交*代码、pom文件没有任何修改。在1.7.2
客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ):Container:
container_1603495749855_55197_02_01 on
hadoop01=LogType:jobmanager.errLog
Upload Time:Wed Dec 09 17:03:38 -0800 2020LogLength:802Log Contents:SLF4J:
Class path contains multiple SLF4J bindings.SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
Found binding in
[jar:file:/data/hadoop/dn/sdd/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/33/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]LogType:jobmanager.logLog
Upload Time:Wed Dec 09 17:03:38 -0800 2020LogLength:980Log
Contents:2020-12-09 17:03:31,918 WARN  org.apache.hadoop.conf.Configuration 
   
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.require.client.cert; 
Ignoring.2020-12-09 17:03:31,931 WARN 
org.apache.hadoop.conf.Configuration [] -
/run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.keystores.factory.class; 
Ignoring.2020-12-09 17:03:31,931 WARN  org.apache.hadoop.conf.Configuration 
   
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.server.conf; 
Ignoring.2020-12-09 17:03:31,932 WARN  org.apache.hadoop.conf.Configuration 
   
[] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an
attempt to override final parameter: hadoop.ssl.client.conf; 
Ignoring.LogType:jobmanager.outLog Upload Time:Wed Dec 09 17:03:38 -0800
2020LogLength:2188Log Contents:2020-12-09 17:03:36.375 [main] ERROR
o.a.f.r.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint
YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException:
Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.   
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
   
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
   
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)Caused
by: java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
   
at
org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79)
   
at org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:73)
   
at org.apache.flink.yarn.YarnResourceManager.(YarnResourceManager.java:155) 
  
at
org.apache.flink.yarn.entrypoint.YarnResourceManagerFactory.createResourceManager(YarnResourceManagerFactory.java:76)
   
at
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61)
   
at
org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory.createResourceManager(ActiveResourceManagerFactory.java:58)
   
at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167)
   
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
   
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
   
at java.security.AccessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:422)at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
   
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
   
... 2 common frames omittedContainer:
container_1603495749855_55197_01_01 on
hadoop01=LogType:jobmanager.errLog
Upload Time:Wed Dec 09 17:03:37 -0800 2020LogLength:802Log Contents:SLF4J:
Class path contains multiple SLF4J bindings.SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar

Re: Flink 1.11.2 on yarn报错

2020-12-09 文章 Jacob
该问题已经fix,确实是java版本问题!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.2 on yarn 可用slot始终为0,job无法提交

2020-12-09 文章 Jacob

 


启动命令:
./bin/flink run-application -t yarn-application
-Djobmanager.memory.process.size=2048m
-Dtaskmanager.memory.process.size=2048m -Dyarn.application.name="Test Job"
-c com.jacob.Main /opt/app/test.jar

Hadoop集群 资源充足。flink无法为job分配slot。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11.2 on yarn报错

2020-12-08 文章 Jacob
*【环境】*
Flink 版本:1.11.2
Hadoop 版本 :2.6.0-cdh5.8.3
Java 版本: 1.8.0_144
-
*【命令】*
[jacob@localhost flink-1.11.2]$ ./bin/yarn-session.sh -jm 1024m -tm 2048m
*【现象】*

2020-12-08 18:06:00,134 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli 
  
[] - Error while running the Flink session.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn session cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:382)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:514)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:751)
~[flink-dist_2.11-1.11.2.jar:1.11.2]   
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.Subject.doAs(Subject.java:423) ~[?:?]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
~[hadoop-common-2.6.0-cdh5.8.3.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:751)
[flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1603495749855_54023 failed 1
times due to AM Container for appattempt_1603495749855_54023_01 exited
with  exitCode: 1
For more detailed output, check application tracking
page:http://***:8088/proxy/application_1603495749855_54023/Then, click
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1603495749855_54023_01_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)
at org.apache.hadoop.util.Shell.run(Shell.java:504)   
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1603495749855_54023
at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:375)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 7 more


 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn session cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:382)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:514)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:751)
at java.base/java.security.AccessController.doPrivileged(Native
Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:751)
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1603495749855_54023 failed 1
times due to AM Container for appattempt_1603495749855_54023_01 exited
with  exitCode: 1
For more detailed output, check a