Re: Mapstate got wrong UK when restored.
HI David Thanks a lot. I almost get the point. When I use initializeState to restore the mapstate, the task can not get a key at that moment, so I just get the key but not the UK, when I use the mapstate in processElement, a key will be provided implictly, so I would get the right UK and UV. But still I think I should get > at the initializeState but not the . Any way, I have changed my code to just use the mapstate provided by flink. Thanks. Yours sincerely Josh David Morávek 于2021年12月29日周三 23:01写道: > The problem is that you're not actually using the underlying state during > runtime, but instead you're simply using a java map abstraction. This > property ("Map state") is simply bound to the UDF lifecycle > and doesn't share the semantics of the keyed state. > > You should be using the "MapState" property directly to get the guarantees > you're looking for. Then you also won't need to override the snapshot / > initialize state methods, which simplifies the code a lot. > > D. > > On Wed, Dec 29, 2021 at 2:08 PM Joshua Fan wrote: > >> Hi David, >> Thanks for you reply. >> Yes, for keyed state, every state is referenced by a particular key, but >> I would guess it is a flink sdk issue, I mean, the keyed state maybe saved >> as (key, keyed state), as for my situation, it is (key, mapstate(UK,UV)), >> I think the key of this pair is not easy to get by user, when I do >> mapstate.keyset I want to get the UK set, not the key set. According to my >> job, the (key, mapstate(UK,UV)) can be get successfully when job is >> running, but when job restarts from a checkpoint, the restored mapstate, >> the pair seemed be changed to (key, UV), the UK just gone, I can not find >> back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly >> added when write or read from the state by flink. >> So, I am still not clear why I get the key but not the UK. >> >> Yours >> Josh >> >> David Morávek 于2021年12月29日周三 17:32写道: >> >>> Hi Josh, >>> >>> it's important bit to understand is that the MapState (or any other >>> keyed state) is scoped per *key* [1]. You can think about it in a way, >>> that for each key you have a separate "map" that backs it. This is the >>> important concept behind distributed stream processing, that allows you to >>> parallelize the computation and still make sure, that all data for the same >>> key end up in the same partition. >>> >>> Does this answer your question? >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state >>> >>> Best, >>> D. >>> >>
Re: Mapstate got wrong UK when restored.
Hi David, Thanks for you reply. Yes, for keyed state, every state is referenced by a particular key, but I would guess it is a flink sdk issue, I mean, the keyed state maybe saved as (key, keyed state), as for my situation, it is (key, mapstate(UK,UV)), I think the key of this pair is not easy to get by user, when I do mapstate.keyset I want to get the UK set, not the key set. According to my job, the (key, mapstate(UK,UV)) can be get successfully when job is running, but when job restarts from a checkpoint, the restored mapstate, the pair seemed be changed to (key, UV), the UK just gone, I can not find back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly added when write or read from the state by flink. So, I am still not clear why I get the key but not the UK. Yours Josh David Morávek 于2021年12月29日周三 17:32写道: > Hi Josh, > > it's important bit to understand is that the MapState (or any other keyed > state) is scoped per *key* [1]. You can think about it in a way, that for > each key you have a separate "map" that backs it. This is the important > concept behind distributed stream processing, that allows you to > parallelize the computation and still make sure, that all data for the same > key end up in the same partition. > > Does this answer your question? > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state > > Best, > D. >
Mapstate got wrong UK when restored.
Hi All My flink version is 1.11, the statebackend is rocksdb, and I want to write a flink job to implement an adaptive window. I wrote a flink dag like below: > DataStream entities = env.addSource(new > EntitySource()).setParallelism(1); > > entities.keyBy(DataEntity::getName).process(new > EntityKeyedProcessFunction()).setParallelism(p); > > The important code is the EntityKeyedProcessFunction, it is attached. I have a mapstate in it like 'private transient MapState entityStates;' I print the content of the mapstate when checkpoint completed, the content is ok like below: > 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] - > >the key is 164067960 > 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] - the > value is name = hippopotamus, value = [window.DataEntity@b9266a0, window. > DataEntity@682fce90] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = crocodile, value = [window.DataEntity@16d20045] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = dolphin, value = [window.DataEntity@1820c75a, window. > DataEntity@64f3b9f6] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = Dragonfly, value = [window.DataEntity@2b2ad03] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = Hedgehog, value = [window.DataEntity@65f39671, window. > DataEntity@2df6b2bf] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = Bee, value = [window.DataEntity@13249998] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = cicada, value = [window.DataEntity@7266e125, window. > DataEntity@167cf1ae] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = Mosquito, value = [window.DataEntity@2596aa5a, window. > DataEntity@603c0804] > 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the > value is name = Crane, value = [window.DataEntity@2a3192e9, window. > DataEntity@3a65398f] > The key of the mapstate is a time which was coalesced to minute. But when the job restarted from a checkpoint, the content of the mapstate changed, actually, the key of the mapstate changed. It would show as below. > 2021-12-28 16:15:45,379 INFO window.EntityKeyedProcessFunction [] - > >the key is carp > 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the > value is name = hippopotamus, value = [window.DataEntity@510d4c4b, window. > DataEntity@7857e387] > 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the > value is name = crocodile, value = [window.DataEntity@31366a33, window. > DataEntity@a62074a] > 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the > value is name = Dragonfly, value = [window.DataEntity@56db63fa, window. > DataEntity@54befce0, window.DataEntity@4e7cf96a] > 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the > value is name = Hedgehog, value = [window.DataEntity@7ad09313, window. > DataEntity@592a2955] > 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the > value is name = Mosquito, value = [window.DataEntity@48c05cae] > 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the > value is name = duck, value = [window.DataEntity@3e9ef1a4] > 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the > value is name = Rhinoceros, value = [window.DataEntity@25f11701, window. > DataEntity@2334b667] > 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the > value is name = Eagle, value = [window.DataEntity@7574eb4a] > It seems like the key of the restored mapstate is the key of the operator. My minute time was gone, and it is replaced by the key of the operator. It is so weird. Do I misuse the mapstate? Thanks. Yours Josh EntityKeyedProcessFunction.java Description: Binary data
Re: custom flink image error
finally, I work out how to build a custom flink image, the Dockerfile just as: > > FROM flink:1.13.1-scala_2.11 > ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins > ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins > the wrong Docker file is : > FROM apache/flink:1.13.1-scala_2.11 ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins It uses the wrong base image. I don't know why apache/flink:1.13.1-scala_2.11 is so different from flink:1.13.1-scala_2.11, I have no idea where the apache comes from. Hope you all are doing it well. Joshua Fan 于2021年8月5日周四 上午11:42写道: > It seems I set a wrong high-availability.storageDir, > s3://flink-test/recovery can work, but s3:///flink-test/recovery can not, > one / be removed. > > Joshua Fan 于2021年8月5日周四 上午10:43写道: > >> Hi Robert, Tobias >> >> I have tried many ways to build and validate the image. >> >> 1.put the s3 dependency to plugin subdirectory, the Dockerfile content is >> below: >> >>> FROM apache/flink:1.13.1-scala_2.11 >>> ADD ./flink-s3-fs-hadoop-1.13.1.jar >>> /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar >>> ADD ./flink-s3-fs-presto-1.13.1.jar >>> /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar >>> >> This time the image can be run on k8s but would also hit a error like >> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not >> find a file system implementation for scheme 's3'.", it seems like flink >> can not find the s3 filesystem supports dynamically. When I want to run the >> image using 'docker run -it ', it would also report >> 'standard_init_linux.go:190: exec user process caused "exec format error"'. >> >> 2.put the s3 dependency to plugin directly, the Dockerfile content is >> below: >> >>> FROM apache/flink:1.13.1-scala_2.11 >>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins >>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins >>> >> The image can not run on the k8s and report error just the same as run >> the image using 'docker run -it ', 'standard_init_linux.go:190: exec user >> process caused "exec format error"'. >> >> 3.just run the community edition image flink:1.13.1-scala_2.11 locally as >> docker run -it, it will also hit the same error >> 'standard_init_linux.go:190: exec user process caused "exec format error"', >> but the flink:1.13.1-scala_2.11 can be run on the k8s without s3 >> requirement. >> >> 4.import the s3 dependency as a kubernetes parameter >> I submit the session with ' >> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar >> \ >> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS= >> flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report >> error as below >> >>> Caused by: java.lang.NullPointerException: null uri host. >>> at java.util.Objects.requireNonNull(Objects.java:228) >>> at >>> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467) >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234) >>> at >>> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) >>> >> but I have set the s3 staff in the flink-conf.yaml as below: >> >>> high-availability: >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> >> high-availability.storageDir: s3:///flink-test/recovery >> >> s3.endpoint: http://xxx.yyy.zzz.net >> >> s3.path.style.access: true >> >> s3.access-key: 111 >> >> s3.secret-key: 222 >> >> I think I supplied all the s3 information in the flink-conf.yaml, but it >> did not work. >> >> I will try other ways to complete the s3 ha on k8s. Thank your guys. >> >> Yours sincerely >> Joshua >> >> Robert Metzger 于2021年8月4日周三 下午11:35写道: >> >>> Hey Joshua, >>> >>> Can you first validate if the docker image you've built is valid by >>> running it locally on your machine? >>> >>> I would recommend putting the s3 filesystem files into the plugins [1] >>> directory to avoid classloading issues. >>> Also, you don't need to build custom images if you want to use
Re: custom flink image error
It seems I set a wrong high-availability.storageDir, s3://flink-test/recovery can work, but s3:///flink-test/recovery can not, one / be removed. Joshua Fan 于2021年8月5日周四 上午10:43写道: > Hi Robert, Tobias > > I have tried many ways to build and validate the image. > > 1.put the s3 dependency to plugin subdirectory, the Dockerfile content is > below: > >> FROM apache/flink:1.13.1-scala_2.11 >> ADD ./flink-s3-fs-hadoop-1.13.1.jar >> /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar >> ADD ./flink-s3-fs-presto-1.13.1.jar >> /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar >> > This time the image can be run on k8s but would also hit a error like > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 's3'.", it seems like flink > can not find the s3 filesystem supports dynamically. When I want to run the > image using 'docker run -it ', it would also report > 'standard_init_linux.go:190: exec user process caused "exec format error"'. > > 2.put the s3 dependency to plugin directly, the Dockerfile content is > below: > >> FROM apache/flink:1.13.1-scala_2.11 >> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins >> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins >> > The image can not run on the k8s and report error just the same as run > the image using 'docker run -it ', 'standard_init_linux.go:190: exec user > process caused "exec format error"'. > > 3.just run the community edition image flink:1.13.1-scala_2.11 locally as > docker run -it, it will also hit the same error > 'standard_init_linux.go:190: exec user process caused "exec format error"', > but the flink:1.13.1-scala_2.11 can be run on the k8s without s3 > requirement. > > 4.import the s3 dependency as a kubernetes parameter > I submit the session with ' > -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar > \ > -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS= > flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report > error as below > >> Caused by: java.lang.NullPointerException: null uri host. >> at java.util.Objects.requireNonNull(Objects.java:228) >> at >> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234) >> at >> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) >> > but I have set the s3 staff in the flink-conf.yaml as below: > >> high-availability: >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > > high-availability.storageDir: s3:///flink-test/recovery > > s3.endpoint: http://xxx.yyy.zzz.net > > s3.path.style.access: true > > s3.access-key: 111 > > s3.secret-key: 222 > > I think I supplied all the s3 information in the flink-conf.yaml, but it > did not work. > > I will try other ways to complete the s3 ha on k8s. Thank your guys. > > Yours sincerely > Joshua > > Robert Metzger 于2021年8月4日周三 下午11:35写道: > >> Hey Joshua, >> >> Can you first validate if the docker image you've built is valid by >> running it locally on your machine? >> >> I would recommend putting the s3 filesystem files into the plugins [1] >> directory to avoid classloading issues. >> Also, you don't need to build custom images if you want to use build-in >> plugins [2] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/ >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins >> >> On Wed, Aug 4, 2021 at 3:06 PM Joshua Fan wrote: >> >>> Hi All >>> I want to build a custom flink image to run on k8s, below is my >>> Dockerfile content: >>> >>>> FROM apache/flink:1.13.1-scala_2.11 >>>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib >>>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib >>>> >>> I just put the s3 fs dependency to the {flink home}/lib, and then I >>> build the image and push it to the repo. >>> >>> When I submit the flink session from the custom image, a error will be >>> reported like "exec /docker-entrypoint.sh failed: Exec format error". >>> >>> I googled a lot, but it seems no useful information. >>> >>> Thanks for your help. >>> >>> Yours sincerely >>> Joshua >>> >>
Re: custom flink image error
Hi Robert, Tobias I have tried many ways to build and validate the image. 1.put the s3 dependency to plugin subdirectory, the Dockerfile content is below: > FROM apache/flink:1.13.1-scala_2.11 > ADD ./flink-s3-fs-hadoop-1.13.1.jar > /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar > ADD ./flink-s3-fs-presto-1.13.1.jar > /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar > This time the image can be run on k8s but would also hit a error like "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'.", it seems like flink can not find the s3 filesystem supports dynamically. When I want to run the image using 'docker run -it ', it would also report 'standard_init_linux.go:190: exec user process caused "exec format error"'. 2.put the s3 dependency to plugin directly, the Dockerfile content is below: > FROM apache/flink:1.13.1-scala_2.11 > ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins > ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins > The image can not run on the k8s and report error just the same as run the image using 'docker run -it ', 'standard_init_linux.go:190: exec user process caused "exec format error"'. 3.just run the community edition image flink:1.13.1-scala_2.11 locally as docker run -it, it will also hit the same error 'standard_init_linux.go:190: exec user process caused "exec format error"', but the flink:1.13.1-scala_2.11 can be run on the k8s without s3 requirement. 4.import the s3 dependency as a kubernetes parameter I submit the session with ' -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar \ -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS= flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report error as below > Caused by: java.lang.NullPointerException: null uri host. > at java.util.Objects.requireNonNull(Objects.java:228) > at > org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234) > at > org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) > but I have set the s3 staff in the flink-conf.yaml as below: > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3:///flink-test/recovery s3.endpoint: http://xxx.yyy.zzz.net s3.path.style.access: true s3.access-key: 111 s3.secret-key: 222 I think I supplied all the s3 information in the flink-conf.yaml, but it did not work. I will try other ways to complete the s3 ha on k8s. Thank your guys. Yours sincerely Joshua Robert Metzger 于2021年8月4日周三 下午11:35写道: > Hey Joshua, > > Can you first validate if the docker image you've built is valid by > running it locally on your machine? > > I would recommend putting the s3 filesystem files into the plugins [1] > directory to avoid classloading issues. > Also, you don't need to build custom images if you want to use build-in > plugins [2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/ > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins > > On Wed, Aug 4, 2021 at 3:06 PM Joshua Fan wrote: > >> Hi All >> I want to build a custom flink image to run on k8s, below is my >> Dockerfile content: >> >>> FROM apache/flink:1.13.1-scala_2.11 >>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib >>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib >>> >> I just put the s3 fs dependency to the {flink home}/lib, and then I build >> the image and push it to the repo. >> >> When I submit the flink session from the custom image, a error will be >> reported like "exec /docker-entrypoint.sh failed: Exec format error". >> >> I googled a lot, but it seems no useful information. >> >> Thanks for your help. >> >> Yours sincerely >> Joshua >> >
custom flink image error
Hi All I want to build a custom flink image to run on k8s, below is my Dockerfile content: > FROM apache/flink:1.13.1-scala_2.11 > ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib > ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib > I just put the s3 fs dependency to the {flink home}/lib, and then I build the image and push it to the repo. When I submit the flink session from the custom image, a error will be reported like "exec /docker-entrypoint.sh failed: Exec format error". I googled a lot, but it seems no useful information. Thanks for your help. Yours sincerely Joshua
Re: SIGSEGV error
Hi Till, I also tried the job without gzip, it came into the same error. But the problem is solved now. I was about to give up to solve it, I found the mail at http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-crash-SIGSEGV-in-ZIP-GetEntry-td17326.html. So I think maybe it was something about the serialize staff. What I have done is : before: OperatorStateStore stateStore = context.getOperatorStateStore(); ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",State.class); after: OperatorStateStore stateStore = context.getOperatorStateStore(); ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",new JavaSerializer()); Hope this is helpful. Yours sincerely Josh Till Rohrmann 于2021年5月18日周二 下午2:54写道: > Hi Joshua, > > could you try whether the job also fails when not using the gzip format? > This could help us narrow down the culprit. Moreover, you could try to run > your job and Flink with Java 11 now. > > Cheers, > Till > > On Tue, May 18, 2021 at 5:10 AM Joshua Fan wrote: > >> Hi all, >> >> Most of the posts says that "Most of the times, the crashes in >> ZIP_GetEntry occur when the jar file being accessed has been >> modified/overwritten while the JVM instance was running. ", but do not >> know when and which jar file was modified according to the job running in >> flink. >> >> for your information. >> >> Yours sincerely >> Josh >> >> Joshua Fan 于2021年5月18日周二 上午10:15写道: >> >>> Hi Stephan, Till >>> >>> Recently, I tried to upgrade a flink job from 1.7 to 1.11, >>> unfortunately, the weird problem appeared, " SIGSEGV (0xb) at >>> pc=0x0025, pid=135306, tid=140439001388800". The pid log is >>> attached. >>> Actually, it is a simple job that consumes messages from kafka and >>> writes into hdfs with a gzip format. It can run in 1.11 for about 2 >>> minutes, then the JVM will crash, then job restart and jvm crash again >>> until the application fails. >>> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns >>> out helpless, the same crash keeps happening. Google suggests to upgrade >>> jdk to jdk1.9, but it is not feasible. >>> Any suggestions? Thanks a lot. >>> >>> Yours sincerely >>> Josh >>> >>> Stephan Ewen 于2019年9月13日周五 下午11:11写道: >>> >>>> Given that the segfault happens in the JVM's ZIP stream code, I am >>>> curious is this is a bug in Flink or in the JVM core libs, that happens to >>>> be triggered now by newer versions of FLink. >>>> >>>> I found this on StackOverflow, which looks like it could be related: >>>> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry >>>> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"? >>>> >>>> >>>> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann >>>> wrote: >>>> >>>>> Hi Marek, >>>>> >>>>> could you share the logs statements which happened before the SIGSEGV >>>>> with us? They might be helpful to understand what happened before. >>>>> Moreover, it would be helpful to get access to your custom serializer >>>>> implementations. I'm also pulling in Gordon who worked on >>>>> the TypeSerializerSnapshot improvements. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an >>>>>> upgrade our task managers started to fail with SIGSEGV error from time to >>>>>> time. >>>>>> >>>>>> In process of adjusting the code to 1.8.1, we noticed that there were >>>>>> some changes around TypeSerializerSnapshot interface and its >>>>>> implementations. At that time we had a few custom serializers which we >>>>>> decided to throw out during migration and then leverage flink default >>>>>> serializers. We don't mind clearing the state in the process of >>>>>> migration, >>>>>> an effort to migrate with state seems to be not worth it. >>>>>> >>>>>> Unfortunately after running new version we see SIGSEGV errors from >>>>>> time to time. It
Re: SIGSEGV error
Hi all, Most of the posts says that "Most of the times, the crashes in ZIP_GetEntry occur when the jar file being accessed has been modified/overwritten while the JVM instance was running. ", but do not know when and which jar file was modified according to the job running in flink. for your information. Yours sincerely Josh Joshua Fan 于2021年5月18日周二 上午10:15写道: > Hi Stephan, Till > > Recently, I tried to upgrade a flink job from 1.7 to 1.11, unfortunately, > the weird problem appeared, " SIGSEGV (0xb) at pc=0x0025, > pid=135306, tid=140439001388800". The pid log is attached. > Actually, it is a simple job that consumes messages from kafka and writes > into hdfs with a gzip format. It can run in 1.11 for about 2 minutes, then > the JVM will crash, then job restart and jvm crash again until the > application fails. > I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns out > helpless, the same crash keeps happening. Google suggests to upgrade jdk to > jdk1.9, but it is not feasible. > Any suggestions? Thanks a lot. > > Yours sincerely > Josh > > Stephan Ewen 于2019年9月13日周五 下午11:11写道: > >> Given that the segfault happens in the JVM's ZIP stream code, I am >> curious is this is a bug in Flink or in the JVM core libs, that happens to >> be triggered now by newer versions of FLink. >> >> I found this on StackOverflow, which looks like it could be related: >> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry >> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"? >> >> >> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann >> wrote: >> >>> Hi Marek, >>> >>> could you share the logs statements which happened before the SIGSEGV >>> with us? They might be helpful to understand what happened before. >>> Moreover, it would be helpful to get access to your custom serializer >>> implementations. I'm also pulling in Gordon who worked on >>> the TypeSerializerSnapshot improvements. >>> >>> Cheers, >>> Till >>> >>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj wrote: >>> >>>> Hi everyone, >>>> >>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an >>>> upgrade our task managers started to fail with SIGSEGV error from time to >>>> time. >>>> >>>> In process of adjusting the code to 1.8.1, we noticed that there were >>>> some changes around TypeSerializerSnapshot interface and its >>>> implementations. At that time we had a few custom serializers which we >>>> decided to throw out during migration and then leverage flink default >>>> serializers. We don't mind clearing the state in the process of migration, >>>> an effort to migrate with state seems to be not worth it. >>>> >>>> Unfortunately after running new version we see SIGSEGV errors from time >>>> to time. It may be that serialization is not the real cause, but at the >>>> moment it seems to be the most probable reason. We have not performed any >>>> significant code changes besides serialization area. >>>> >>>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205. >>>> Checkpoint configuration: RocksDB backend, not incremental, 50s min >>>> processing time >>>> >>>> You can find parts of JobManager log and ErrorFile log of failed >>>> container included below. >>>> >>>> Any suggestions are welcome >>>> >>>> Best regards >>>> Marek Maj >>>> >>>> jobmanager.log >>>> >>>> 019-09-10 16:30:28.177 INFO o.a.f.r.c.CheckpointCoordinator - >>>> Completed checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965 >>>> (18532488122 bytes in 60871 ms). >>>> >>>> 2019-09-10 16:31:19.223 INFO o.a.f.r.c.CheckpointCoordinator - >>>> Triggering checkpoint 48 @ 1568111478177 for job >>>> c8a9ae03785ade86348c3189cf7dd965. >>>> >>>> 2019-09-10 16:32:19.280 INFO o.a.f.r.c.CheckpointCoordinator - >>>> Completed checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965 >>>> (19049515705 bytes in 61083 ms). >>>> >>>> 2019-09-10 16:33:10.480 INFO o.a.f.r.c.CheckpointCoordinator - >>>> Triggering checkpoint 49 @ 1568111589279 for job >>>> c8a9ae03785ade86348c3189cf7dd965. >>>> >>>> 2019-09-10 16:33:36.773 WARN o.a.f.r.r.h.l.m
Problems about pv uv in flink sql
Hi I have learned from the community on how to do pv/uv in flink sql. One is to make a MMdd grouping, the other is to make a day window. Thank you all. I have a question about the result output. For MMdd grouping, every minute the database would get a record, and many records would be in the database as time goes on, but there would be only a few records in the database according to the day window. for example, the pv would be 12:00,100 12:01,200 12:02,300 12:03,400 according to the MMdd grouping solution, for the day window solution, there would be only one record as 12:00,100 |12:01,200|12:02,300|12:03,400. I wonder, for the day window solution, is it possible to have the same result output as the MMdd solution? because the day window solution has no worry about the state retention. Thanks. Yours sincerely Josh
Re: Flink streaming sql是否支持两层group by聚合
Hi Jark and Benchao, There are three more weird things about the pv uv in Flink SQL. As I described in the above email, I computed the pv uv in two method, I list them below: For the day grouping one, the sql is > insert into pvuv_sink > select a,v,MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) dt, > COUNT(m2) AS pv, > COUNT(DISTINCT m2) AS uv from kafkaTable GROUP BY DATE_FORMAT(ts, > '-MM-dd'),a,v; > And the result of one dimension is [image: result_day_grouping.png] For the 1 day window one, the sql is > insert into pvuv_sink select a,v,MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) dt, COUNT(m2) AS pv, COUNT(DISTINCT m2) AS uv from kafkaTable GROUP BY tumble(ts, interval '1' > day),a,v; And the result of one dimension is [image: result_1day_window.png] Here are the three questions: 1. According to the same cpu and memory and parallelism, but the day grouping solution is faster than the 1 day window solution, the day grouping solution cost 1 hour to consume all the data, but the 1 day window solution cost 4 hours to consume all the data. 2. The final result is not the same, the pv/uv of the day grouping is 7304086/7299878, but the pv/uv of the 1 day window is 7304352/7300144, I think both of the result is not accurate, but approximate? So, how about the loss of accuracy? What is the algorithm below the count distinct? 3. As the picture of the 1 day window shows, there are many records of the a=1, v=12.0.6.1, dt=2021-01-13 17:45:00, but in my last mail, I noticed the records changed always when the job begin to execute, and one record per dimension, now on the final time, it popped up so many records per dimension, it's weird. Any advice will be fully appreciated. Yours sincerely Josh On Wed, Jan 13, 2021 at 7:24 PM Joshua Fan wrote: > Hi Jark and Benchao > > I have learned from your previous email on how to do pv/uv in flink sql. > One is to make a MMdd grouping, the other is to make a day window. > Thank you all. > > I have a question about the result output. For MMdd grouping, every > minute the database would get a record, and many records would be in the > database as time goes on, but there would be only a few records in the > database according to the day window. > > for example, the pv would be 12:00,100 12:01,200 12:02,300 12:03,400 > according to the MMdd grouping solution, for the day window solution, > there would be only one record as 12:00,100 |12:01,200|12:02,300|12:03,400. > > I wonder, for the day window solution, is it possible to have the same > result output as the MMdd solution? because the day window solution has > no worry about the state retention. > > Thanks. > > Yours sincerely > > Josh > > On Sat, Apr 18, 2020 at 9:38 PM Jark Wu wrote: > >> Hi, >> >> I will use English because we are also sending to user@ ML. >> >> This behavior is as expected, not a bug. Benchao gave a good explanation >> about the reason. I will give some further explanation. >> In Flink SQL, we will split an update operation (such as uv from 100 -> >> 101) into two separate messages, one is -[key, 100], the other is +[key, >> 101]. >> Once these two messages arrive the downstream aggregation, it will also >> send two result messages (assuming the previous SUM(uv) is 500), >> one is [key, 400], the other is [key, 501]. >> >> But this problem is almost addressed since 1.9, if you enabled the >> mini-batch optimization [1]. Because mini-batch optimization will try best >> to the >> accumulate the separate + and - message in a single mini-batch >> processing. You can upgrade and have a try. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >> >> >> >> On Sat, 18 Apr 2020 at 12:26, Benchao Li wrote: >> >>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。 >>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 >>> >>> dixingxing85 于2020年4月18日 周六上午11:38写道: >>> >>>> 多谢benchao, >>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: >>>> 20200417,86 >>>> 20200417,90 >>>> 20200417,130 >>>> 20200417,131 >>>> >>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: >>>> 20200417,90 >>>> 20200417,86 >>>> 20200417,130 >>>> 20200417,86 >>>> 20200417,131 >>>> >>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 >>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧? >>>> >>>> Sent from my iPhone >>>> >>>> On Apr 18, 2020, at 10:08, Benchao
Re: Flink streaming sql是否支持两层group by聚合
Hi Jark and Benchao I have learned from your previous email on how to do pv/uv in flink sql. One is to make a MMdd grouping, the other is to make a day window. Thank you all. I have a question about the result output. For MMdd grouping, every minute the database would get a record, and many records would be in the database as time goes on, but there would be only a few records in the database according to the day window. for example, the pv would be 12:00,100 12:01,200 12:02,300 12:03,400 according to the MMdd grouping solution, for the day window solution, there would be only one record as 12:00,100 |12:01,200|12:02,300|12:03,400. I wonder, for the day window solution, is it possible to have the same result output as the MMdd solution? because the day window solution has no worry about the state retention. Thanks. Yours sincerely Josh On Sat, Apr 18, 2020 at 9:38 PM Jark Wu wrote: > Hi, > > I will use English because we are also sending to user@ ML. > > This behavior is as expected, not a bug. Benchao gave a good explanation > about the reason. I will give some further explanation. > In Flink SQL, we will split an update operation (such as uv from 100 -> > 101) into two separate messages, one is -[key, 100], the other is +[key, > 101]. > Once these two messages arrive the downstream aggregation, it will also > send two result messages (assuming the previous SUM(uv) is 500), > one is [key, 400], the other is [key, 501]. > > But this problem is almost addressed since 1.9, if you enabled the > mini-batch optimization [1]. Because mini-batch optimization will try best > to the > accumulate the separate + and - message in a single mini-batch processing. > You can upgrade and have a try. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > > > > On Sat, 18 Apr 2020 at 12:26, Benchao Li wrote: > >> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。 >> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 >> >> dixingxing85 于2020年4月18日 周六上午11:38写道: >> >>> 多谢benchao, >>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: >>> 20200417,86 >>> 20200417,90 >>> 20200417,130 >>> 20200417,131 >>> >>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: >>> 20200417,90 >>> 20200417,86 >>> 20200417,130 >>> 20200417,86 >>> 20200417,131 >>> >>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 >>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧? >>> >>> Sent from my iPhone >>> >>> On Apr 18, 2020, at 10:08, Benchao Li wrote: >>> >>> >>> >>> Hi, >>> >>> 这个是支持的哈。 >>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new]. >>> 如果是两层的话,就成了: >>> 第一层-[old], 第二层-[cur], +[old] >>> 第一层+[new], 第二层[-old], +[new] >>> >>> dixingxin...@163.com 于2020年4月18日周六 上午2:11写道: >>> Hi all: 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug, 或者flink还不支持这种sql*。 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt, B -> pvareaid) SELECT dt, SUM(a.uv) AS uv FROM ( SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv FROM streaming_log_event WHERE action IN ('action1') AND pvareaid NOT IN ('pv1', 'pv2') AND pvareaid IS NOT NULL GROUP BY dt, pvareaid ) a GROUP BY dt; sink接收到的数据对应日志为: 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417) 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417) 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417) 我们使用的是1.7.2, 测试作业的并行度为1。 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228 -- dixingxin...@163.com >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking University >>> Tel:+86-15650713730 >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>> >>> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >>
Re: Flink streaming sql是否支持两层group by聚合
Hi Jark and Benchao I have learned from your previous email on how to do pv/uv in flink sql. One is to make a MMdd grouping, the other is to make a day window. Thank you all. I have a question about the result output. For MMdd grouping, every minute the database would get a record, and many records would be in the database as time goes on, but there would be only a few records in the database according to the day window. for example, the pv would be 12:00,100 12:01,200 12:02,300 12:03,400 according to the MMdd grouping solution, for the day window solution, there would be only one record as 12:00,100 |12:01,200|12:02,300|12:03,400. I wonder, for the day window solution, is it possible to have the same result output as the MMdd solution? because the day window solution has no worry about the state retention. Thanks. Yours sincerely Josh On Sat, Apr 18, 2020 at 9:38 PM Jark Wu wrote: > Hi, > > I will use English because we are also sending to user@ ML. > > This behavior is as expected, not a bug. Benchao gave a good explanation > about the reason. I will give some further explanation. > In Flink SQL, we will split an update operation (such as uv from 100 -> > 101) into two separate messages, one is -[key, 100], the other is +[key, > 101]. > Once these two messages arrive the downstream aggregation, it will also > send two result messages (assuming the previous SUM(uv) is 500), > one is [key, 400], the other is [key, 501]. > > But this problem is almost addressed since 1.9, if you enabled the > mini-batch optimization [1]. Because mini-batch optimization will try best > to the > accumulate the separate + and - message in a single mini-batch processing. > You can upgrade and have a try. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > > > > On Sat, 18 Apr 2020 at 12:26, Benchao Li wrote: > >> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。 >> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 >> >> dixingxing85 于2020年4月18日 周六上午11:38写道: >> >>> 多谢benchao, >>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: >>> 20200417,86 >>> 20200417,90 >>> 20200417,130 >>> 20200417,131 >>> >>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: >>> 20200417,90 >>> 20200417,86 >>> 20200417,130 >>> 20200417,86 >>> 20200417,131 >>> >>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 >>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧? >>> >>> Sent from my iPhone >>> >>> On Apr 18, 2020, at 10:08, Benchao Li wrote: >>> >>> >>> >>> Hi, >>> >>> 这个是支持的哈。 >>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new]. >>> 如果是两层的话,就成了: >>> 第一层-[old], 第二层-[cur], +[old] >>> 第一层+[new], 第二层[-old], +[new] >>> >>> dixingxin...@163.com 于2020年4月18日周六 上午2:11写道: >>> Hi all: 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug, 或者flink还不支持这种sql*。 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt, B -> pvareaid) SELECT dt, SUM(a.uv) AS uv FROM ( SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv FROM streaming_log_event WHERE action IN ('action1') AND pvareaid NOT IN ('pv1', 'pv2') AND pvareaid IS NOT NULL GROUP BY dt, pvareaid ) a GROUP BY dt; sink接收到的数据对应日志为: 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417) 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417) 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417) 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417) 我们使用的是1.7.2, 测试作业的并行度为1。 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228 -- dixingxin...@163.com >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking University >>> Tel:+86-15650713730 >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>> >>> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >>
Does flink have a plan to support flink sql udf of any language?
Hi, Does the flink community have a plan to support flink sql udf in any language? For example, a udf in c or php. Because in my company, many developers do not know java or scala, they use c in their usual work. Now we have a workaround to support this situation by creating a process running the c logic, and the process communicates with flink sql by a wrapper udf function. So, here I have two questions: 1. Does the flink community have a plan to support flink sql udf in any language? 2. If the answer to question 1 is no, is there any possibility to push our solution to flink? Is this udf process staff in line with the ideas of the community aboud flink sql? Thanks Yours Sincerely Josh
Re: why not flink delete the checkpoint directory recursively?
Hi Roman and Robert, Thank you. I have checked the code and the checkpoint deleting failure case. Yes, Flink will delete the meta file and operator state file at first, then delete the checkpoint dir which is truly an empty dir. The root cause of the failure of deleting checkpoint is the hadoop delete will check the directory and recursive parameter. I will work with people who in charge of the hdfs to solve this problem. Thanks again. Yours sincerely Josh On Tue, Nov 17, 2020 at 6:36 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi, > > I think Robert is right, state handles are deleted first, and then the > directory is deleted non-recursively. > If any exception occurs while removing the files, it will be combined with > the other exception (as suppressed). > So probably Flink failed to delete some files and then directory removal > failed because of that. > Can you share the full exception to check this? > And probably check what files exist there as Robert suggested. > > Regards, > Roman > > > On Tue, Nov 17, 2020 at 10:38 AM Joshua Fan > wrote: > >> Hi Robert, >> >> When the `delete(Path f, boolean recursive)` recursive is false, hdfs >> will throw exception like below: >> [image: checkpoint-exception.png] >> >> Yours sincerely >> Josh >> >> On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger >> wrote: >> >>> Hey Josh, >>> >>> As far as I understand the code CompletedCheckpoint.discard(), Flink is >>> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then >>> deleting the directory. >>> >>> Which files are left over in your case? >>> Do you see any exceptions on the TaskManagers? >>> >>> Best, >>> Robert >>> >>> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan >>> wrote: >>> >>>> Hi >>>> >>>> When a checkpoint should be deleted, >>>> FsCompletedCheckpointStorageLocation.disposeStorageLocation will be >>>> called. >>>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete >>>> action. I wonder why the recursive parameter is set to false? as the >>>> exclusiveCheckpointDir is truly a directory. in our hadoop, this >>>> causes the checkpoint cannot be removed. >>>> It is easy to change the recursive parameter to true, but is there any >>>> potential harm? >>>> >>>> Yours sincerely >>>> Josh >>>> >>>>
Re: why not flink delete the checkpoint directory recursively?
Hi Robert, When the `delete(Path f, boolean recursive)` recursive is false, hdfs will throw exception like below: [image: checkpoint-exception.png] Yours sincerely Josh On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger wrote: > Hey Josh, > > As far as I understand the code CompletedCheckpoint.discard(), Flink is > removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then > deleting the directory. > > Which files are left over in your case? > Do you see any exceptions on the TaskManagers? > > Best, > Robert > > On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan > wrote: > >> Hi >> >> When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation >> .disposeStorageLocation will be called. >> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete >> action. I wonder why the recursive parameter is set to false? as the >> exclusiveCheckpointDir is truly a directory. in our hadoop, this causes >> the checkpoint cannot be removed. >> It is easy to change the recursive parameter to true, but is there any >> potential harm? >> >> Yours sincerely >> Josh >> >>
why not flink delete the checkpoint directory recursively?
Hi When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation. disposeStorageLocation will be called. Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete action. I wonder why the recursive parameter is set to false? as the exclusiveCheckpointDir is truly a directory. in our hadoop, this causes the checkpoint cannot be removed. It is easy to change the recursive parameter to true, but is there any potential harm? Yours sincerely Josh
The rpc invocation size exceeds the maximum akka framesize when the job was re submitted.
hi, We have a flink job platform which will resubmit the job when the job failed without platform user involvement. Today a resubmit failed because of the error below, I changed the akka.Frameszie, and the resubmit succeed. My question is, there is nothing change to the job, the jar, the program, or the arguments, why the error suddenly happened? java.io.IOException: The rpc invocation size exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:247) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:196) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) at com.sun.proxy.$Proxy28.submitTask(Unknown Source) at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:99) at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:614) at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:970) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:542) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:774) at akka.dispatch.OnComplete.internal(Future.scala:259) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:19) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:434) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:433) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks Joshua
Re: Fencing token exceptions from Job Manager High Availability mode
Sorry to forget the version, we run flink 1.7 on yarn in a ha mode. On Fri, Oct 11, 2019 at 12:02 PM Joshua Fan wrote: > Hi Till > > After got your advice, I checked the log again. It seems not wholely the > same as the condition you mentioned. > > I would like to summarize the story in the belowed log. > > Once a time, the zk connection was not stable, so there happened 3 times > suspended-reconnected. > > After the first suspended-reconnected, the Minidispatcher tried to recover > all jobs. > > Then the second suspended-reconnected came, after this reconnected, there > happened a 'The heartbeat of JobManager with id > dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the > Minidispatcher didn't try to recover the job. > > Due to the zk connection did not recover, the third suspended-reconnected > came, after the zk reconnected, the Minidispatcher did not try to recover > job ,but just repeated throw FencingTokenException, the AM was hanging, our > alarm-system just > found the job was gone, but can not get a final state of the job. And the > FencingTokenException was ongoing for nearly one day long before we killed > the AM. > > the whole log is attached. > > Thanks > > Joshua > > On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce > wrote: > >> Hi Till and Fabian, >> >> >> >> My apologies for taking a week to reply; it took some time to reproduce >> the issue with debug logging. I’ve attached logs from a two minute period >> when the problem happened. I’m just sending this to you two to avoid >> sending the log file all over the place. If you’d like to have our >> conversation in the user group mailing list, that’s fine. >> >> >> >> The job was submitted by using the job manager REST api starting at >> 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the >> job started running. We then run a monitor that polls the /overview >> endpoint of the JM REST api. This started polling at 20:34:31.380 and >> resulted in the JM throwing the FencingTokenException at 20:34:31:393, and >> the JM returned a 500 to our monitor. This will happen every time we poll >> until the monitor times out and then we tear down the cluster, even though >> the job is running, we can’t tell that it is. This is somewhat rare, >> happening maybe 5% of the time. >> >> >> >> We’re running Flink 1.7.1. This issue only happens when we run in Job >> Manager High Availability mode. We provision two Job Managers, a 3-node >> zookeeper cluster, task managers and our monitor all in their own >> Kubernetes namespace. I can send you Zookeeper logs too if that would be >> helpful. >> >> >> >> Thanks in advance for any help you can provide! >> >> >> >> -Bruce >> >> -- >> >> >> >> >> >> *From: *Till Rohrmann >> *Date: *Wednesday, October 2, 2019 at 6:10 AM >> *To: *Fabian Hueske >> *Cc: *"Hanson, Bruce" , "user@flink.apache.org" < >> user@flink.apache.org> >> *Subject: *Re: Fencing token exceptions from Job Manager High >> Availability mode >> >> >> >> Hi Bruce, are you able to provide us with the full debug logs? From the >> excerpt itself it is hard to tell what is going on. >> >> >> >> Cheers, >> >> Till >> >> >> >> On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske wrote: >> >> Hi Bruce, >> >> >> >> I haven't seen such an exception yet, but maybe Till (in CC) can help. >> >> >> >> Best, >> >> Fabian >> >> >> >> Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce < >> bruce.han...@here.com>: >> >> Hi all, >> >> >> >> We are running some of our Flink jobs with Job Manager High Availability. >> Occasionally we get a cluster that comes up improperly and doesn’t respond. >> Attempts to submit the job seem to hang and when we hit the /overview REST >> endpoint in the Job Manager we get a 500 error and a fencing token >> exception like this: >> >> >> >> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428] >> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler - >> Implementation error: Unhandled exception.* >> >> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing >> token not set: Ignoring message LocalFencedMessage(null, >> LocalRpcInvocation(requestResourceOverview(Time))) sent to &
Re: Fencing token exceptions from Job Manager High Availability mode
Hi Till After got your advice, I checked the log again. It seems not wholely the same as the condition you mentioned. I would like to summarize the story in the belowed log. Once a time, the zk connection was not stable, so there happened 3 times suspended-reconnected. After the first suspended-reconnected, the Minidispatcher tried to recover all jobs. Then the second suspended-reconnected came, after this reconnected, there happened a 'The heartbeat of JobManager with id dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the Minidispatcher didn't try to recover the job. Due to the zk connection did not recover, the third suspended-reconnected came, after the zk reconnected, the Minidispatcher did not try to recover job ,but just repeated throw FencingTokenException, the AM was hanging, our alarm-system just found the job was gone, but can not get a final state of the job. And the FencingTokenException was ongoing for nearly one day long before we killed the AM. the whole log is attached. Thanks Joshua On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce wrote: > Hi Till and Fabian, > > > > My apologies for taking a week to reply; it took some time to reproduce > the issue with debug logging. I’ve attached logs from a two minute period > when the problem happened. I’m just sending this to you two to avoid > sending the log file all over the place. If you’d like to have our > conversation in the user group mailing list, that’s fine. > > > > The job was submitted by using the job manager REST api starting at > 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the > job started running. We then run a monitor that polls the /overview > endpoint of the JM REST api. This started polling at 20:34:31.380 and > resulted in the JM throwing the FencingTokenException at 20:34:31:393, and > the JM returned a 500 to our monitor. This will happen every time we poll > until the monitor times out and then we tear down the cluster, even though > the job is running, we can’t tell that it is. This is somewhat rare, > happening maybe 5% of the time. > > > > We’re running Flink 1.7.1. This issue only happens when we run in Job > Manager High Availability mode. We provision two Job Managers, a 3-node > zookeeper cluster, task managers and our monitor all in their own > Kubernetes namespace. I can send you Zookeeper logs too if that would be > helpful. > > > > Thanks in advance for any help you can provide! > > > > -Bruce > > -- > > > > > > *From: *Till Rohrmann > *Date: *Wednesday, October 2, 2019 at 6:10 AM > *To: *Fabian Hueske > *Cc: *"Hanson, Bruce" , "user@flink.apache.org" < > user@flink.apache.org> > *Subject: *Re: Fencing token exceptions from Job Manager High > Availability mode > > > > Hi Bruce, are you able to provide us with the full debug logs? From the > excerpt itself it is hard to tell what is going on. > > > > Cheers, > > Till > > > > On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske wrote: > > Hi Bruce, > > > > I haven't seen such an exception yet, but maybe Till (in CC) can help. > > > > Best, > > Fabian > > > > Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce < > bruce.han...@here.com>: > > Hi all, > > > > We are running some of our Flink jobs with Job Manager High Availability. > Occasionally we get a cluster that comes up improperly and doesn’t respond. > Attempts to submit the job seem to hang and when we hit the /overview REST > endpoint in the Job Manager we get a 500 error and a fencing token > exception like this: > > > > *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428] > level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler - > Implementation error: Unhandled exception.* > > *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing > token not set: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestResourceOverview(Time))) sent to > akka.tcp://fl...@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager > because the fencing token is null.* > > *at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)* > > *at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)* > > *at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)* > > *at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)* > > *at akka.actor.Actor$class.aroundReceive(Actor.scala:502)* > > *at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)* > > *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)* > > *at akka.actor.ActorCell.invoke(ActorCell.scala:495)* > > *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)* > > *at
Re: Difference between data stream window function and cep within
Hi Dian Thank you for your explanation. After have a look at the source code, the cep within just executes by a time interval according to each state. Thank you. Yours sincerely Joshua On Wed, Sep 18, 2019 at 9:41 AM Dian Fu wrote: > Hi Joshua, > > There is no tumbling/sliding window underlying the cep within > implementation. > > The difference between datastream window and cep within is that: > 1) Regarding to datastream window, the window is unified for all the > elements (You can think that the window already exists before the input > elements come). For example, for sliding window: (window size: 60s, slide > size: 10s), then the windows will be [0s, 60s], [10s, 70s], [20s, 80s], > etc. When the input elements come, they are put into the windows they > belong to. > 2) Regarding to cep within, it defines the maximum time interval for an > event sequence to match the pattern. So a unified window is not suitable > for this requirement. Regarding to the underlying implementation, for each > matching/partial-matching sequence, the time interval between the first > element and the last element of the sequence will be checked against the > within interval. You can refer to [1] for details. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/459fd929399ad6c80535255eefa278564ec33683/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java#L251 > > > 在 2019年9月17日,下午7:47,Joshua Fan 写道: > > Hi All, > > I'd like to know the difference between data stream window function and > cep within, I googled this issue but found no useful information. > > Below the cep within, is there a tumbling window or sliding window or just > a process function? > > Your explanation will be truly appreciated. > > Yours sincerely > > Joshua > > >
Difference between data stream window function and cep within
Hi All, I'd like to know the difference between data stream window function and cep within, I googled this issue but found no useful information. Below the cep within, is there a tumbling window or sliding window or just a process function? Your explanation will be truly appreciated. Yours sincerely Joshua
Re: Maybe a flink bug. Job keeps in FAILING state
Hi Zhijiang Thank you for your analysis. I agree with it. The solution may be to let tm exit like you mentioned when any type of oom occurs, because the flink has no control on a tm when a oom occurs. I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889. Don't know it is worth to fix. Thank you all. Yours sincerely Joshua On Fri, Jun 21, 2019 at 5:32 PM zhijiang wrote: > Thanks for the reminding @Chesnay Schepler . > > I just looked throught the related logs. Actually all the five > "Source: ServiceLog" tasks are not in terminal state on JM view, the > relevant processes are as follows: > > 1. The checkpoint in task causes OOM issue which would call > `Task#failExternally` as a result, we could see the log "Attempting to > fail task externally" in tm. > 2. The source task would transform state from RUNNING to FAILED and then > starts a canceler thread for canceling task, we could see log "Triggering > cancellation of task" in tm. > 3. When JM starts to cancel the source tasks, the rpc call > `Task#cancelExecution` would find the task was already in FAILED state as > above step 2, we could see log "Attempting to cancel task" in tm. > > At last all the five source tasks are not in terminal states from jm log, > I guess the step 2 might not create canceler thread successfully, because > the root failover was caused by OOM during creating native thread in step1, > so it might exist possibilities that createing canceler thread is not > successful as well in OOM case which is unstable. If so, the source task > would not been interrupted at all, then it would not report to JM as well, > but the state is already changed to FAILED before. > > For the other vertex tasks, it does not trigger `Task#failExternally` in > step 1, and only receives the cancel rpc from JM in step 3. And I guess at > this time later than the source period, the canceler thread could be > created succesfully after some GCs, then these tasks could be canceled as > reported to JM side. > > I think the key problem is under OOM case some behaviors are not within > expectations, so it might bring problems. Maybe we should handle OOM error > in extreme way like making TM exit to solve the potential issue. > > Best, > Zhijiang > > -- > From:Chesnay Schepler > Send Time:2019年6月21日(星期五) 16:34 > To:zhijiang ; Joshua Fan < > joshuafat...@gmail.com> > Cc:user ; Till Rohrmann > Subject:Re: Maybe a flink bug. Job keeps in FAILING state > > The logs are attached to the initial mail. > > Echoing my thoughts from earlier: from the logs it looks as if the TM > never even submits the terminal state RPC calls for several tasks to the JM. > > On 21/06/2019 10:30, zhijiang wrote: > Hi Joshua, > > If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really > in CANCELED state on TM side, but in CANCELING state on JM side, then it > might indicates the terminal state RPC was not received by JM. I am not > sure whether the OOM would cause this issue happen resulting in unexpected > behavior. > > In addition, you mentioned these tasks are still active after OOM and was > called to cancel, so I am not sure what is the specific periods for your > attached TM stack. I think it might provide help if you could provide > corresponding TM log and JM log. > From TM log it is easy to check the task final state. > > Best, > Zhijiang > -- > From:Joshua Fan > Send Time:2019年6月20日(星期四) 11:55 > To:zhijiang > Cc:user ; Till Rohrmann > ; Chesnay Schepler > > Subject:Re: Maybe a flink bug. Job keeps in FAILING state > > zhijiang > > I did not capture the job ui, the topology is in FAILING state, but the > persistentbolt subtasks as can be seen in the picture attached in first > mail was all canceled, and the parsebolt subtasks as described before had > one subtask FAILED, other subtasks CANCELED, but the source subtasks had > one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask > 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. > > The subtask status described above is in jm view, but in tm view, all of > the source subtask was in FAILED, do not know why jm was not notify about > this. > > As all of the failed status was triggered by a oom by the subtask can not > create native thread when checkpointing, I also dumped the stack of the > jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask > 5/5) are still active after it throwed a oom and was called to cancel . I > attached the jstack file in this em
Re: run python job with flink 1.7
when I look into the log file, it turned out that flink cannot get the plan when create the plan file, full log message is below. 107 2019-05-17 12:24:56.950 [main] ERROR org.apache.flink.python.api.PythonPlanBinder - Failed to run plan. 108 java.lang.RuntimeException: Plan file caused an error. Check log-files for details. 109 at org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.preparePlanMode(PythonPlanStreamer.java:107) 110 at org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:178) 111 at org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:98) 112 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 113 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 114 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 115 at java.lang.reflect.Method.invoke(Method.java:498) 116 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) 117 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) 118 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) 119 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) 120 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) 121 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) 122 at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) 123 at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) 124 at java.security.AccessController.doPrivileged(Native Method) 125 at javax.security.auth.Subject.doAs(Subject.java:422) 126 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762) 127 at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) 128 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) 129 2019-05-17 12:24:56.951 [main] INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 130 2019-05-17 12:24:56.959 [main] INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. I am not familiar with python. Thanks for your help. On Fri, May 17, 2019 at 11:47 AM Joshua Fan wrote: > Hi all > > When I run the python example in flink 1.7, it always got a excepthon. > > The command is: ./bin/pyflink.sh ./examples/python/streaming/word_count.py > > The return message is: > 2019-05-17 11:43:22,900 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > data01.hj.shbt.qihoo.net/10.203.82.17:8832 > Starting execution of program > Traceback (most recent call last): > File "/tmp/flink_plan_0a1aed4b-4155-4cfa-b2ba-8083a4a61f6e/plan.py", > line 21, in > from org.apache.flink.api.common.functions import FlatMapFunction > ImportError: No module named org.apache.flink.api.common.functions > Failed to run plan: Plan file caused an error. Check log-files for details. > > The program didn't contain a Flink job. Perhaps you forgot to call > execute() on the execution environment. > > Can not find any help in google. > Appreciate your help very much. > > Sincerely > Joshua >
run python job with flink 1.7
Hi all When I run the python example in flink 1.7, it always got a excepthon. The command is: ./bin/pyflink.sh ./examples/python/streaming/word_count.py The return message is: 2019-05-17 11:43:22,900 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at data01.hj.shbt.qihoo.net/10.203.82.17:8832 Starting execution of program Traceback (most recent call last): File "/tmp/flink_plan_0a1aed4b-4155-4cfa-b2ba-8083a4a61f6e/plan.py", line 21, in from org.apache.flink.api.common.functions import FlatMapFunction ImportError: No module named org.apache.flink.api.common.functions Failed to run plan: Plan file caused an error. Check log-files for details. The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment. Can not find any help in google. Appreciate your help very much. Sincerely Joshua
The submitting is hanging when register a hdfs file as registerCacheFile in 1.7 based on RestClusterClient
Hi, all As the title says, the submitting is always hanging there when the cache file is not reachable, actually because the RestClient uses a java.io.File to get the cache file. I use RestClusterClient to submit job in Flink 1.7. Below is instructions shown in https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#distributed-cache : ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // register a file from HDFS env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // register a local executable file (script, executable, ...) env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) Unfortunately, both the two examples can not be submitted, because either hdfs:///path/to/your/file or file:///path/to/exec/file is not reachable by the java.io.File, the http post will not finish and the submitting is hanging. When use env.registerCachedFile("/path/to/exec/file", "localExecFile", true), the path is a regular local Path , the job can be submitted and the cache file is available. Is there some problems in the code or should I fire a jira? Yours Joshua
Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource
Hi Hequn Thanks. Now I know what you mean. To use tableEnv.registerTableSource instead of using StreamTableDescriptor.registerTableSource. Yes, it is a good solution. If the StreamTableDescriptor itself can use a user-defined classloader, it is better. Thank you. Yours sincerely Joshua On Wed, Jan 16, 2019 at 10:24 AM Joshua Fan wrote: > Hi Hequn > > Yes, the TableFactoryService has a proper method. As I > use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor > actually uses ConnectTableDescriptor which calls TableFactoryUtil to do > service load, and TableFactoryUtil does not use a user defined classloader, > so I can not use `TableFactoryService.find(StreamTableSourceFactory.class, > streamTableDescriptor, classloader)` in StreamTableDescriptor directly. > > One solution for me is: > 1.add method to TableFactoryUtil to use user defined classloader. > 2.add method to ConnectTableDescriptor accordingly. > 3.add method to StreamTableDescriptor accordingly. > > But I wonder if there is a current solution to register TableSource from > StreamTableDescriptor using user defined classloader. > > Your sincerely > Joshua > > On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng wrote: > >> Hi Joshua, >> >> Could you use `TableFactoryService` directly to register TableSource? The >> code looks like: >> >> final TableSource tableSource = >>> TableFactoryService.find(StreamTableSourceFactory.class, >>> streamTableDescriptor, classloader) >>> .createStreamTableSource(propertiesMap); >>> tableEnv.registerTableSource(name, tableSource); >> >> >> Best, Hequn >> >> On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan >> wrote: >> >>> Hi >>> >>> As known, TableFactoryService has many methods to find a suitable >>> service to load. Some of them use a user defined classloader, the others >>> just uses the default classloader. >>> >>> Now I use ConnectTableDescriptor to registerTableSource in the >>> environment, which uses TableFactoryUtil to load service, but >>> TableFactoryUtil just use the default classloader, it is not enough in my >>> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put >>> together in the lib directory. >>> >>> Is there a proper way to use ConnectTableDescriptor to >>> registerTableSource at a user defined classloader? >>> >>> I know SQL Client has their now implementation to avoid >>> use TableFactoryUtil, but I think TableFactoryUtil itself should also >>> provide a method to use user defined classloader. >>> >>> Yours sincerely >>> Joshhua >>> >>
Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource
Hi Hequn Yes, the TableFactoryService has a proper method. As I use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor actually uses ConnectTableDescriptor which calls TableFactoryUtil to do service load, and TableFactoryUtil does not use a user defined classloader, so I can not use `TableFactoryService.find(StreamTableSourceFactory.class, streamTableDescriptor, classloader)` in StreamTableDescriptor directly. One solution for me is: 1.add method to TableFactoryUtil to use user defined classloader. 2.add method to ConnectTableDescriptor accordingly. 3.add method to StreamTableDescriptor accordingly. But I wonder if there is a current solution to register TableSource from StreamTableDescriptor using user defined classloader. Your sincerely Joshua On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng wrote: > Hi Joshua, > > Could you use `TableFactoryService` directly to register TableSource? The > code looks like: > > final TableSource tableSource = >> TableFactoryService.find(StreamTableSourceFactory.class, >> streamTableDescriptor, classloader) >> .createStreamTableSource(propertiesMap); >> tableEnv.registerTableSource(name, tableSource); > > > Best, Hequn > > On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan wrote: > >> Hi >> >> As known, TableFactoryService has many methods to find a suitable service >> to load. Some of them use a user defined classloader, the others just uses >> the default classloader. >> >> Now I use ConnectTableDescriptor to registerTableSource in the >> environment, which uses TableFactoryUtil to load service, but >> TableFactoryUtil just use the default classloader, it is not enough in my >> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put >> together in the lib directory. >> >> Is there a proper way to use ConnectTableDescriptor to >> registerTableSource at a user defined classloader? >> >> I know SQL Client has their now implementation to avoid >> use TableFactoryUtil, but I think TableFactoryUtil itself should also >> provide a method to use user defined classloader. >> >> Yours sincerely >> Joshhua >> >
There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource
Hi As known, TableFactoryService has many methods to find a suitable service to load. Some of them use a user defined classloader, the others just uses the default classloader. Now I use ConnectTableDescriptor to registerTableSource in the environment, which uses TableFactoryUtil to load service, but TableFactoryUtil just use the default classloader, it is not enough in my case. Because the user may use kafka 0.8 or 0.9, the jars can not be put together in the lib directory. Is there a proper way to use ConnectTableDescriptor to registerTableSource at a user defined classloader? I know SQL Client has their now implementation to avoid use TableFactoryUtil, but I think TableFactoryUtil itself should also provide a method to use user defined classloader. Yours sincerely Joshhua
Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7
Hi Zhenghua Yes, the topic is polluted somehow. After I create a new topic to consume, It is OK now. Yours sincerely Joshua On Tue, Jan 15, 2019 at 4:28 PM Zhenghua Gao wrote: > May be you're generating non-standard JSON record. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7
Hi Timo Thank you for your advice. It is truely a typo. After I fix it, the same exception remains. But when I add the inAppendMode() to the StreamTableDescriptor, the exception disappears, and it can find the proper kafka08factory. And another exception turns out. Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values at [Source: [B@69e1cfbe; line: 1, column: 6] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:94) But actually, I produced the json data to the topic, why flink can not deserialize it? It is weird. Yours Joshua On Fri, Jan 11, 2019 at 11:02 PM Timo Walther wrote: > Hi Jashua, > > according to the property list, you passed "connector.version=0.10" so a > Kafka 0.8 factory will not match. > > Are you sure you are compiling the right thing? There seems to be a > mismatch between your screenshot and the exception. > > Regards, > Timo > > Am 11.01.19 um 15:43 schrieb Joshua Fan: > > Hi, > > I want to test flink sql locally by consuming kafka data in flink 1.7, but > it turns out an exception like below. > > Exception in thread "main" >>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find >>> a suitable table factory for >>> 'org.apache.flink.table.factories.StreamTableSourceFactory' in >> >> the classpath. >> >> >>> Reason: No context matches. >> >> >>> The following properties are requested: >> >> connector.properties.0.key=fetch.message.max.bytes >> >> connector.properties.0.value=10485760 >> >> connector.properties.1.key=zookeeper.connect >> >> connector.properties.1.value=10.xxx.:2181/kafka >> >> connector.properties.2.key=group.id >> >> connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21 >> >> connector.properties.3.key=bootstrap.servers >> >> connector.properties.3.value=10.xxx:9092 >> >> connector.property-version=1 >> >> connector.startup-mode=latest-offset >> >> connector.topic=-flink-test >> >> connector.type=kafka >> >> connector.version=0.10 >> >> format.derive-schema=true >> >> format.property-version=1 >> >> format.type=json >> >> schema.0.name=rideId >> >> schema.0.type=VARCHAR >> >> schema.1.name=lon >> >> schema.1.type=VARCHAR >> >> >>> The following factories have been considered: >> >> org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory >> >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >> >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory >> >> org.apache.flink.formats.json.JsonRowFormatFactory >> >> >>> at >>> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) >> >> at >>> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) >> >> at >>> org.apache.fl
NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7
Hi, I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below. Exception in thread "main" >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find >> a suitable table factory for >> 'org.apache.flink.table.factories.StreamTableSourceFactory' in > > the classpath. > > >> Reason: No context matches. > > >> The following properties are requested: > > connector.properties.0.key=fetch.message.max.bytes > > connector.properties.0.value=10485760 > > connector.properties.1.key=zookeeper.connect > > connector.properties.1.value=10.xxx.:2181/kafka > > connector.properties.2.key=group.id > > connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21 > > connector.properties.3.key=bootstrap.servers > > connector.properties.3.value=10.xxx:9092 > > connector.property-version=1 > > connector.startup-mode=latest-offset > > connector.topic=-flink-test > > connector.type=kafka > > connector.version=0.10 > > format.derive-schema=true > > format.property-version=1 > > format.type=json > > schema.0.name=rideId > > schema.0.type=VARCHAR > > schema.1.name=lon > > schema.1.type=VARCHAR > > >> The following factories have been considered: > > org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > > org.apache.flink.formats.json.JsonRowFormatFactory > > >> at >> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) > > at >> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) > > at >> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81) > > at >> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49) > > at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) > > at TableSourceFinder.main(TableSourceFinder.java:40) > > here is my code: public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka(); Properties properties = new Properties(); String zkString = "10.xxx:2181/kafka"; String brokerList = "10.xxx:9092"; properties.setProperty("fetch.message.max.bytes", "10485760"); properties.setProperty("group.id", UUID.randomUUID().toString()); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("bootstrap.servers", brokerList); kafka.version("0.8").topic("flink-test").properties(properties); kafka.startFromLatest(); stEnv.connect(kafka).withSchema( new Schema() .field("rideId", Types.STRING()) .field("lon", Types.STRING())) .withFormat(new Json().deriveSchema()) .registerTableSource("test"); Table table = stEnv.sqlQuery("select rideId from test"); DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv). toAppendStream(table,Types.STRING()); ds.print(); env.execute("KafkaSql"); } And here is my pom.xml org.apache.flink flink-table_2.11 ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-kafka-0.8_2.11 ${flink.version} In my opinion, I have all the lib in pom, don't know why it would fail in test locally. Thank you for any hints. Yours Joshua
Re: Using port ranges to connect with the Flink Client
Hi Chesnay Yes, RestClusterClient is used in our company when using flink 1.7. It can do almost everything except to get the ClusterOverview when I want to get summary information on a session cluster. Finally, I manually trigger a http get request to the cluster to do that. If RestClusterClient can provide a similar interface will be good. Yours Joshua On Fri, Jan 4, 2019 at 5:28 PM Gyula Fóra wrote: > Hi, > > Thanks Chesnay my problem was fixed it was related to enabling port ranges > for the rest client it turned out. > > Gyula > > On Fri, 4 Jan 2019 at 10:26, Chesnay Schepler wrote: > >> @Gyula: From what I can tell your custom client is still relying on >> akka, and should be using the RestClusterClient instead. >> >> @Joshua: Are you by change using the ClusterClient directly? Unless >> you're working with legacy clusters, for 1.5+ you should use the >> RestClusterClient instead. >> >> On 03.01.2019 08:32, Joshua Fan wrote: >> > Hi, Gyula >> > >> > I met a similar situation. >> > >> > We used flink 1.4 before, and everything is ok. >> > >> > Now, we upgrade to flink 1.7 and use non-legacy mode, there seems >> > something not ok, it all refers to that it is impossible get the >> > jobmanagerGateway at client side. When I create a cluster without a >> > job, I describe the cluster, flink will throw the same exception as >> > you pointed out. When I submit a job, I want to trigger a savepoint at >> > client side, it will also throw the same exception. >> > >> > Don't know why in non-legacy mode,flink will not write back the leader >> > info into zookeeper in the path of >> > /flink/app_9_000/leader/0/job _manager_lock. This causes >> > all the operations fail when using the jobmanagerGateway method in >> > ClusterClient. >> > >> > Hope someone can explain how to do this in a non-legacy mode. >> > >> > Yours sincerely >> > Joshua >> >> >>
Re: no log exists in JM and TM when updated to Flink 1.7
Hi Till I found the root cause why log-not-show when use logback, because flink does not include the logback-*.jar in the lib folder. After I put the logback jar file in lib, everything is ok now. I think flink should put the logback jar files into the lib directory, not just the log4j jar file, because the both log configuration file exist in conf directory. Yours sincerely Joshua On Wed, Jan 2, 2019 at 10:08 PM Till Rohrmann wrote: > Hi Joshua, > > could you check the content of the logback.xml. Maybe this file has > changed between the versions. > > Cheers, > Till > > On Wed, Dec 26, 2018 at 11:19 AM Joshua Fan > wrote: > >> Hi, >> >> It is very weird that there is no log file for JM and TM when run flink >> job on yarn after updated flink to 1.7.on Flink 1.4.2, everything is OK. I >> checked the log directory, there were jobmanager.error and jobmanager.out, >> but without jobmanager.log, but the log message which should exist in >> jobmanager.log now shows up in jobmanager.error. The taskmanager has the >> same situation, no taskmanager.log but information exists in >> taskmanager.error. >> >> below is the container lauch shell, and it seems ok. >> >> yarn 21784 0.0 0.0 109896 1480 ?Ss 15:13 0:00 >> /bin/bash -c /home/yarn/software/java8/bin/java -Xmx424m >> -Dlog.file=/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.log >> -Dlogback.configurationFile=file:logback.xml >> org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint 1> >> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.out >> 2> >> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/ >> >> any hints?Thanks a lot. >> >> Yours >> Joshua >> >
Re: Using port ranges to connect with the Flink Client
Hi, Gyula I met a similar situation. We used flink 1.4 before, and everything is ok. Now, we upgrade to flink 1.7 and use non-legacy mode, there seems something not ok, it all refers to that it is impossible get the jobmanagerGateway at client side. When I create a cluster without a job, I describe the cluster, flink will throw the same exception as you pointed out. When I submit a job, I want to trigger a savepoint at client side, it will also throw the same exception. Don't know why in non-legacy mode,flink will not write back the leader info into zookeeper in the path of /flink/app_9_000/leader/0/job _manager_lock. This causes all the operations fail when using the jobmanagerGateway method in ClusterClient. Hope someone can explain how to do this in a non-legacy mode. Yours sincerely Joshua
Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes
I found the root cause why log-not-show when use logback, because flink does not include the logback-*.jar in the lib folder. After I put the logback jar file in lib, everything is ok now. On Fri, Dec 28, 2018 at 10:41 PM Chesnay Schepler wrote: > @Steven: Do you happen do know whether a JIRA exists for this issue? > > @Joshua: Does this also happen if you use log4j? > > On 26.12.2018 11:33, Joshua Fan wrote: > > wow, I met similar situation using flink 1.7 on yarn. > > there was no jobmanager.log on the node but jobmanager.out and > jobmanager.error, and jobmanager.error contains the log message. so , there > was nothing in the webUI. > > I do not know why this happened. by the way, I used logback to do log > staff. > > On Thu, Dec 20, 2018 at 12:50 AM Steven Nelson > wrote: > >> There is a known issue for this I believe. The problem is that the >> containerized versions of Flink output logs to STDOUT instead of files >> inside the node. If you pull use docker logs on the container you can see >> what you’re looking for. I use the Kube dashboard to view the logs >> centrally. >> >> Sent from my iPhone >> >> > On Dec 19, 2018, at 9:40 AM, William Saar wrote: >> > >> > >> > I'm running Flink 1.7 in ECS, is this a known issue or should I create >> a jira? >> > >> > The web console doesn't show anything when trying to list logs or >> stdout for task managers and the job manager log have stack traces for the >> errors >> > 2018-12-19 15:35:53,498 ERROR >> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler >> - Failed to transfer file from TaskExecutor >> d7fd266047d5acfaddeb1156bdb23ff3. >> > java.util.concurrent.CompletionException: >> org.apache.flink.util.FlinkException: The file STDOUT is not available on >> the TaskExecutor. >> > >> > 2018-12-19 15:36:02,538 ERROR >> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler >> - Failed to transfer file from TaskExecutor >> d7fd266047d5acfaddeb1156bdb23ff3. >> > java.util.concurrent.CompletionException: >> org.apache.flink.util.FlinkException: The file LOG is not available on the >> TaskExecutor. >> > >> > >
Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes
wow, I met similar situation using flink 1.7 on yarn. there was no jobmanager.log on the node but jobmanager.out and jobmanager.error, and jobmanager.error contains the log message. so , there was nothing in the webUI. I do not know why this happened. by the way, I used logback to do log staff. On Thu, Dec 20, 2018 at 12:50 AM Steven Nelson wrote: > There is a known issue for this I believe. The problem is that the > containerized versions of Flink output logs to STDOUT instead of files > inside the node. If you pull use docker logs on the container you can see > what you’re looking for. I use the Kube dashboard to view the logs > centrally. > > Sent from my iPhone > > > On Dec 19, 2018, at 9:40 AM, William Saar wrote: > > > > > > I'm running Flink 1.7 in ECS, is this a known issue or should I create a > jira? > > > > The web console doesn't show anything when trying to list logs or stdout > for task managers and the job manager log have stack traces for the errors > > 2018-12-19 15:35:53,498 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler > - Failed to transfer file from TaskExecutor > d7fd266047d5acfaddeb1156bdb23ff3. > > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: The file STDOUT is not available on > the TaskExecutor. > > > > 2018-12-19 15:36:02,538 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler > - Failed to transfer file from TaskExecutor > d7fd266047d5acfaddeb1156bdb23ff3. > > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: The file LOG is not available on the > TaskExecutor. > > >
no log exists in JM and TM when updated to Flink 1.7
Hi, It is very weird that there is no log file for JM and TM when run flink job on yarn after updated flink to 1.7.on Flink 1.4.2, everything is OK. I checked the log directory, there were jobmanager.error and jobmanager.out, but without jobmanager.log, but the log message which should exist in jobmanager.log now shows up in jobmanager.error. The taskmanager has the same situation, no taskmanager.log but information exists in taskmanager.error. below is the container lauch shell, and it seems ok. yarn 21784 0.0 0.0 109896 1480 ?Ss 15:13 0:00 /bin/bash -c /home/yarn/software/java8/bin/java -Xmx424m -Dlog.file=/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.log -Dlogback.configurationFile=file:logback.xml org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint 1> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.out 2> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/ any hints?Thanks a lot. Yours Joshua
Weird behavior in actorSystem shutdown in akka
Hi, Till and users, There is a weird behavior in actorSystem shutdown in akka of our flink platform. We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing agent to submit flink job to yarn which is based on YarnClient. User can connect to the agent to submit job and disconnect, but the agent is always there. So, each time the user submit a job there would be a ActorSystem created, after the job submitted in detached mode successfully, the ActorSystem would be shutdown. The weird thing is that there always an akka error message turn out in jm log after 2 days( 2 day is the default value in akka of quarantine-after-silence), like below. 2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 - Association to [akka.tcp://fl...@client01v.xxx:35767] with UID [-1757115446] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) In the above, the client01v*** is the host node where runs the agent, and the above error turns out randomly. We trigger a savepoint in the agent every half hour, it means the actorSystem will be created and shutdown accordingly. But only 1 of 50 chance the shutdown will raise a error like above. I think maybe it refer to the akka system. I checked the akka code, found some clues as below. for those there is no error raised in two days, the log in jm like this: 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@:29448] and remote [tcp://flink@:56906] was disassociated because the ProtocolStateActor failed: Shutdown 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@:29448] and remote [tcp://flink@:56906] was disassociated because the ProtocolStateActor failed: Shutdown 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. It seems the remote actor receives the shutdown proposal, the akka message may flow like below: 1.The agent shut down the actorSystem 2.The EndpointReader in jm receives an AssociationHandle. Shutdown and EndpointReader just throws it as a ShutDownAssociation, and the EndpointWriter will publishAndthrow the ShutDownAssociation again. 2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem reported by the EndpointWriter, it also throw it out. 3.when the EndpointManager in jm gets the ShutDownAssociation exception, the EndpointManager would stop the actor. but for the one which will raised the silent error , the log in jm like this, seems the remote actor did not receives the shutdown proposal: 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@:29448] and remote [tcp://flink@:45103] was disassociated because the ProtocolStateActor failed: Unknown 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@:29448] and remote [tcp://flink@:45103] was disassociated because the ProtocolStateActor failed: Unknown 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN
Re: Get nothing from TaskManager in UI
Hi Vino, the version is 1.4.2. Yours Joshua On Tue, Oct 23, 2018 at 7:26 PM vino yang wrote: > Hi Joshua, > > Which version of Flink are you using? > > Thanks, vino. > > Joshua Fan 于2018年10月23日周二 下午5:58写道: > >> Hi All >> >> came into new situations, that the UI can show metric data but the data >> remains the same all the time after days. So, there are two cases, one is >> no data in UI at all, another is dead data in UI all the time. >> >> when dig into the taskmanager.log, taskmanager.error, taskmanager.out, >> there is nothing unnormal found. >> >> anyone can give some hints? >> >> Yours sincerely >> Joshua >> >> On Wed, Oct 17, 2018 at 5:05 PM Joshua Fan >> wrote: >> >>> Hi,all >>> >>> Frequently, for some cluster, there is no data from Task Manager in UI, >>> as the picture shows below. >>> [image: tm-hang.png] >>> but the cluster and the job is running well, just no metrics can be got. >>> anything can do to improve this? >>> >>> Thanks for your assistance. >>> >>> Your sincerely >>> Joshua >>> >>
Re: Get nothing from TaskManager in UI
Hi All came into new situations, that the UI can show metric data but the data remains the same all the time after days. So, there are two cases, one is no data in UI at all, another is dead data in UI all the time. when dig into the taskmanager.log, taskmanager.error, taskmanager.out, there is nothing unnormal found. anyone can give some hints? Yours sincerely Joshua On Wed, Oct 17, 2018 at 5:05 PM Joshua Fan wrote: > Hi,all > > Frequently, for some cluster, there is no data from Task Manager in UI, > as the picture shows below. > [image: tm-hang.png] > but the cluster and the job is running well, just no metrics can be got. > anything can do to improve this? > > Thanks for your assistance. > > Your sincerely > Joshua >
Re: Checkpointing when one of the sources has completed
Hi Niels, Probably not, an operator begins to do checkpoint until it gets all the barriers from all the upstream sources, if one source can not send a barrier, the downstream operator can not do checkpoint, FYI. Yours sincerely Joshua On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam wrote: > Hi All, > > I am debugging an issue where the periodic checkpointing has halted. I > noticed that one of the sources of my job has completed (finished). The > other sources and operators would however still be able to produce output. > > Does anyone know if Flink's periodic checkpoints are supposed to continue > when one or more sources of a job are in the "FINISHED" state? > > Cheers, > Niels > >
Get nothing from TaskManager in UI
Hi,all Frequently, for some cluster, there is no data from Task Manager in UI, as the picture shows below. [image: tm-hang.png] but the cluster and the job is running well, just no metrics can be got. anything can do to improve this? Thanks for your assistance. Your sincerely Joshua
How to submit a job with dependency jars by flink cli in Flink 1.4.2?
Hi, I'd like to submit a job with dependency jars by flink run, but it failed. Here is the script, /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \ -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \ -c StreamExample \ -C file:/home/work/xxx/lib/commons-math3-3.5.jar \ -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \ ... xxx-1.0.jar As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar. After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 ClassLoader info: URL ClassLoader: file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing) file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing) ... Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded. Am I missing something? In Flink 1.4.2, dependency jar is not supported? Hope someone can give me some hint. Appreciate it very mush. Yours Sincerely Joshua