Re:求使用oracle jdk8的flink docker镜像Dockerfile

2021-11-09 文章 欧阳武林



 我遇到过和你一样的问题,dockerfile 脚本如下,你把jdk的包和dockerfile放到同一个目录下。然后再执行docker 
build命令,就能打包出来有jstack,jps命令的image了。




```


FROM flink

RUN mkdir -p $FLINK_HOME/usrlib

RUN mkdir -p $FLINK_HOME/.kube

COPY jdk1.8.0_301 /usr/lib/jdk1.8.0_301

ENV JAVA_HOME /usr/lib/jdk1.8.0_301

ENV PATH ${JAVA_HOME}/bin:$PATH

COPY ./config $FLINK_HOME/.kube

RUN chown flink:flink $FLINK_HOME/.kube/config

RUN chmod 644 $FLINK_HOME/.kube/config

```















欧阳武林


18896723...@139.com


18896723655







电子名片新出VIP模板啦,快来体验>>




扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:"casel.chen" 
To:"user-zh@flink.apache.org" 
Date:2021-11-09 16:49:35
Subject:求使用oracle jdk8的flink docker镜像Dockerfile

查了下flink官方docker image https://github.com/apache/flink-docker  
是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。
当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!





Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-09 文章 yidan zhao
此外,按照event
time分区的情况下,迟到数据怎么处理的。如果是streaming情况,window算子,迟到数据是丢弃的。对于flinksql这种从kafka写到hive,只是依靠event
time做分区的情况,迟到数据是什么表现呢。

yidan zhao  于2021年11月10日周三 下午1:03写道:

> 另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。
>
> 比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。
>
> yidan zhao  于2021年11月9日周二 上午10:50写道:
>
>> 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。
>>
>> 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。
>>
>>
>> 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。
>> 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗?
>>
>>
>> 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。
>>
>> Caizhi Weng  于2021年11月5日周五 下午1:35写道:
>>
>>> Hi!
>>>
>>> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>>
>>> 正确
>>>
>>> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>>
>>> 正确
>>>
>>> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上
>>>
>>> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。
>>>
>>> yidan zhao  于2021年11月4日周四 下午5:57写道:
>>>
>>> > hi,还想继续问下。这个合并机制,根据文档介绍如下。
>>> > Whether to enable automatic compaction in streaming sink or not. The
>>> data
>>> > will be written to temporary files. After the checkpoint is completed,
>>> the
>>> > temporary files generated by a checkpoint will be compacted. The
>>> temporary
>>> > files are invisible before compaction.
>>> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
>>> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>> >
>>> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>> >
>>> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>>> >
>>> >
>>> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>>> >
>>> >
>>> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
>>> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>>> >
>>>
>>


Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-09 文章 yidan zhao
另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。
比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。

yidan zhao  于2021年11月9日周二 上午10:50写道:

> 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。
>
> 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。
>
>
> 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。
> 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗?
>
>
> 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。
>
> Caizhi Weng  于2021年11月5日周五 下午1:35写道:
>
>> Hi!
>>
>> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>
>> 正确
>>
>> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>
>> 正确
>>
>> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上
>>
>> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。
>>
>> yidan zhao  于2021年11月4日周四 下午5:57写道:
>>
>> > hi,还想继续问下。这个合并机制,根据文档介绍如下。
>> > Whether to enable automatic compaction in streaming sink or not. The
>> data
>> > will be written to temporary files. After the checkpoint is completed,
>> the
>> > temporary files generated by a checkpoint will be compacted. The
>> temporary
>> > files are invisible before compaction.
>> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
>> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>> >
>> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>> >
>> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>> >
>> >
>> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>> >
>> >
>> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
>> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>> >
>>
>


flink 1.12.1 写入hdfs 存在inProgress文件

2021-11-09 文章 jie han
Hi!
 我们现在使用flink1.12.1版本的flink, 使用StreamingFileSink写入hdfs, 发现存在以下两个问题点。
  1: 官方文档明确提示出: 任务关闭未采用savepoint/ checkpoint 重启 会导致 最后一批的文件无法复原的状态,保持在
inProgress 状态
  例如: 路径 /data/user/test 中存在三个文件
  part-0-0
  part-0-1
  .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
   2: 任务再次启动产生的Buckets 的 永远从 0开始往上递增,上一次任务产生了一些文件。
 同一个任务再次启动后文件:
  part-0-0
  part-0-1
  .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
  .part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d文件
  由于part-0-0文件已经存在,.part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d
始终无法变更成part-0-0。


Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 文章 Dian Fu
FYI

On Wed, Nov 10, 2021 at 9:31 AM Dian Fu  wrote:

> 也可以通过以下方式:
> - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
> - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定
>
>
> 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives
>
> On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:
>
>> HI!
>>   我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python
>> udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案?
>
>


Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 文章 Dian Fu
也可以通过以下方式:
- Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
- Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定

但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives

On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> HI!
>   我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python
> udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案?


Could not start rest endpoint on any port in port range 8081报错,但是8081未占用啊

2021-11-09 文章 Geoff nie
大佬好!
flink ./start-cluster.sh 启动过程中如下错误,但是该端口号未占用啊,请大佬帮看看。谢谢


org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
 [flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
 [flink-dist_2.11-1.13.3.jar:1.13.3]
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_171]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in 
port range 8081
at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_171]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
... 2 more









求使用oracle jdk8的flink docker镜像Dockerfile

2021-11-09 文章 casel.chen
查了下flink官方docker image https://github.com/apache/flink-docker  
是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。
当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!

Re:回复: Re: 提交flink作业抛 java.lang.LinkageError

2021-11-09 文章 casel.chen
试过设置成provided,作业运行会抛ClassNotFoundError: 
org/apache/kafka/clients/consumer/ConsumerRecord





在 2021-11-09 09:54:59,"WuKong"  写道:
>Hi :
>看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided
>
>Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>>> initiated loading for a different type with name
>>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>
>
>
>---
>Best,
>WuKong
> 
>发件人: casel.chen
>发送时间: 2021-11-08 14:38
>收件人: user-zh
>主题: Re:Re: 提交flink作业抛 java.lang.LinkageError
>版本是一致的,都是1.12.5版本
> 
> 
> 
> 
>在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>>Hi,
>>
>>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>>
>>casel.chen  于2021年11月5日周五 下午11:25写道:
>>
>>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>>
>>>
>>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>>
>>>
>>>
>>>
>>>
>>> Classpath:
>>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>>
>>>
>>>
>>>
>>>
>>>
>>> System.out: (none)
>>>
>>>
>>>
>>>
>>>
>>>
>>> System.err: (none)
>>>
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>>
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>>
>>>
>>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>>
>>>
>>> at
>>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>>
>>>
>>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>>
>>>
>>> at
>>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>>
>>>
>>> at
>>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>>
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>
>>>
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>>> initiated loading for a different type with name
>>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>>
>>>
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>>
>>>
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>>
>>>
>>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>
>>>
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>
>>>
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)