Re:求使用oracle jdk8的flink docker镜像Dockerfile
我遇到过和你一样的问题,dockerfile 脚本如下,你把jdk的包和dockerfile放到同一个目录下。然后再执行docker build命令,就能打包出来有jstack,jps命令的image了。 ``` FROM flink RUN mkdir -p $FLINK_HOME/usrlib RUN mkdir -p $FLINK_HOME/.kube COPY jdk1.8.0_301 /usr/lib/jdk1.8.0_301 ENV JAVA_HOME /usr/lib/jdk1.8.0_301 ENV PATH ${JAVA_HOME}/bin:$PATH COPY ./config $FLINK_HOME/.kube RUN chown flink:flink $FLINK_HOME/.kube/config RUN chmod 644 $FLINK_HOME/.kube/config ``` 欧阳武林 18896723...@139.com 18896723655 电子名片新出VIP模板啦,快来体验>> 扫一扫, 快速添加名片到手机 The following is the content of the forwarded email From:"casel.chen" To:"user-zh@flink.apache.org" Date:2021-11-09 16:49:35 Subject:求使用oracle jdk8的flink docker镜像Dockerfile 查了下flink官方docker image https://github.com/apache/flink-docker 是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。 当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!
Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。
此外,按照event time分区的情况下,迟到数据怎么处理的。如果是streaming情况,window算子,迟到数据是丢弃的。对于flinksql这种从kafka写到hive,只是依靠event time做分区的情况,迟到数据是什么表现呢。 yidan zhao 于2021年11月10日周三 下午1:03写道: > 另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。 > > 比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。 > > yidan zhao 于2021年11月9日周二 上午10:50写道: > >> 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。 >> >> 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。 >> >> >> 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。 >> 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗? >> >> >> 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。 >> >> Caizhi Weng 于2021年11月5日周五 下午1:35写道: >> >>> Hi! >>> >>> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 >>> >>> 正确 >>> >>> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 >>> >>> 正确 >>> >>> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上 >>> >>> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。 >>> >>> yidan zhao 于2021年11月4日周四 下午5:57写道: >>> >>> > hi,还想继续问下。这个合并机制,根据文档介绍如下。 >>> > Whether to enable automatic compaction in streaming sink or not. The >>> data >>> > will be written to temporary files. After the checkpoint is completed, >>> the >>> > temporary files generated by a checkpoint will be compacted. The >>> temporary >>> > files are invisible before compaction. >>> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。 >>> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 >>> > >>> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 >>> > >>> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web >>> > >>> > >>> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。 >>> > >>> > >>> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。 >>> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。 >>> > >>> >>
Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。
另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。 比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。 yidan zhao 于2021年11月9日周二 上午10:50写道: > 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。 > > 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。 > > > 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。 > 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗? > > > 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。 > > Caizhi Weng 于2021年11月5日周五 下午1:35写道: > >> Hi! >> >> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 >> >> 正确 >> >> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 >> >> 正确 >> >> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上 >> >> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。 >> >> yidan zhao 于2021年11月4日周四 下午5:57写道: >> >> > hi,还想继续问下。这个合并机制,根据文档介绍如下。 >> > Whether to enable automatic compaction in streaming sink or not. The >> data >> > will be written to temporary files. After the checkpoint is completed, >> the >> > temporary files generated by a checkpoint will be compacted. The >> temporary >> > files are invisible before compaction. >> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。 >> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。 >> > >> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。 >> > >> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web >> > >> > >> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。 >> > >> > >> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。 >> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。 >> > >> >
flink 1.12.1 写入hdfs 存在inProgress文件
Hi! 我们现在使用flink1.12.1版本的flink, 使用StreamingFileSink写入hdfs, 发现存在以下两个问题点。 1: 官方文档明确提示出: 任务关闭未采用savepoint/ checkpoint 重启 会导致 最后一批的文件无法复原的状态,保持在 inProgress 状态 例如: 路径 /data/user/test 中存在三个文件 part-0-0 part-0-1 .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c 2: 任务再次启动产生的Buckets 的 永远从 0开始往上递增,上一次任务产生了一些文件。 同一个任务再次启动后文件: part-0-0 part-0-1 .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c .part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d文件 由于part-0-0文件已经存在,.part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d 始终无法变更成part-0-0。
Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?
FYI On Wed, Nov 10, 2021 at 9:31 AM Dian Fu wrote: > 也可以通过以下方式: > - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定 > - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定 > > > 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。 > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives > > On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote: > >> HI! >> 我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python >> udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案? > >
Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?
也可以通过以下方式: - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定 - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries [2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote: > HI! > 我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python > udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案?
Could not start rest endpoint on any port in port range 8081报错,但是8081未占用啊
大佬好! flink ./start-cluster.sh 启动过程中如下错误,但是该端口号未占用啊,请大佬帮看看。谢谢 org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59) [flink-dist_2.11-1.13.3.jar:1.13.3] Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_171] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.3.jar:1.13.3] ... 2 more Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 8081 at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_171] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.3.jar:1.13.3] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.3.jar:1.13.3] ... 2 more
求使用oracle jdk8的flink docker镜像Dockerfile
查了下flink官方docker image https://github.com/apache/flink-docker 是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。 当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!
Re:回复: Re: 提交flink作业抛 java.lang.LinkageError
试过设置成provided,作业运行会抛ClassNotFoundError: org/apache/kafka/clients/consumer/ConsumerRecord 在 2021-11-09 09:54:59,"WuKong" 写道: >Hi : >看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided > >Caused by: java.lang.LinkageError: loader constraint violation: loader >>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >>> initiated loading for a different type with name >>> "org/apache/kafka/clients/consumer/ConsumerRecord" > > > >--- >Best, >WuKong > >发件人: casel.chen >发送时间: 2021-11-08 14:38 >收件人: user-zh >主题: Re:Re: 提交flink作业抛 java.lang.LinkageError >版本是一致的,都是1.12.5版本 > > > > >在 2021-11-08 11:11:35,"Shuiqiang Chen" 写道: >>Hi, >> >>能检查下作业jar里 kafka client的版本和平台上的是否一致吗? >> >>casel.chen 于2021年11月5日周五 下午11:25写道: >> >>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink >>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka >>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! >>> >>> >>> 2021-11-05 16:38:58 - [submit-session-executor-6] ERROR >>> c.h.s.launcher.AbstractJobExecutor - -start job failed- >>> >>> >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> caused an error: >>> >>> >>> >>> >>> >>> >>> Classpath: >>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar] >>> >>> >>> >>> >>> >>> >>> System.out: (none) >>> >>> >>> >>> >>> >>> >>> System.err: (none) >>> >>> >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) >>> >>> >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) >>> >>> >>> at >>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205) >>> >>> >>> at >>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31) >>> >>> >>> at >>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51) >>> >>> >>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15) >>> >>> >>> at >>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443) >>> >>> >>> at >>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662) >>> >>> >>> at >>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623) >>> >>> >>> at >>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke() >>> >>> >>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) >>> >>> >>> at >>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) >>> >>> >>> at >>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) >>> >>> >>> at >>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >>> >>> >>> at >>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156) >>> >>> >>> at >>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) >>> >>> >>> at >>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) >>> >>> >>> at >>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) >>> >>> >>> at >>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment() >>> >>> >>> at >>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63) >>> >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> >>> >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>> Caused by: java.lang.LinkageError: loader constraint violation: loader >>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously >>> initiated loading for a different type with name >>> "org/apache/kafka/clients/consumer/ConsumerRecord" >>> >>> >>> at java.lang.ClassLoader.defineClass1(Native Method) >>> >>> >>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756) >>> >>> >>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >>> >>> >>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >>> >>> >>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >>> >>> >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >>> >>> >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >>> >>> >>> at java.security.AccessController.doPrivileged(Native Method)