Re: Mapstate got wrong UK when restored.

2021-12-31 Thread Joshua Fan
HI David

Thanks a lot.
I almost get the point. When I use initializeState to restore the mapstate,
the task can not get a key at that moment, so I just get the key but not
the UK, when I use the mapstate in processElement, a key will be provided
implictly, so I would get the right UK and UV. But still I think I should
get > at the initializeState but not the . Any way, I
have changed my code to just use the mapstate provided by flink.
Thanks.

Yours sincerely
Josh


David Morávek  于2021年12月29日周三 23:01写道:

> The problem is that you're not actually using the underlying state during
> runtime, but instead you're simply using a java map abstraction. This
> property ("Map state") is simply bound to the UDF lifecycle
> and doesn't share the semantics of the keyed state.
>
> You should be using the "MapState" property directly to get the guarantees
> you're looking for. Then you also won't need to override the snapshot /
> initialize state methods, which simplifies the code a lot.
>
> D.
>
> On Wed, Dec 29, 2021 at 2:08 PM Joshua Fan  wrote:
>
>> Hi David,
>> Thanks for you reply.
>> Yes, for keyed state, every state is referenced by a particular key, but
>> I would guess it is a flink sdk issue, I mean,  the keyed state maybe saved
>> as (key,  keyed state), as for my situation, it is (key, mapstate(UK,UV)),
>> I think the key of this pair is not easy to get by user, when I do
>> mapstate.keyset I want to get the UK set, not the key set. According to my
>> job, the (key, mapstate(UK,UV)) can be get successfully when job is
>> running, but when job restarts from a checkpoint, the restored mapstate,
>> the pair seemed be changed to (key, UV), the UK just gone, I can not find
>> back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly
>> added when write or read from the state by flink.
>> So, I am still not clear why I get the key but not the UK.
>>
>> Yours
>> Josh
>>
>> David Morávek  于2021年12月29日周三 17:32写道:
>>
>>> Hi Josh,
>>>
>>> it's important bit to understand is that the MapState (or any other
>>> keyed state) is scoped per *key* [1]. You can think about it in a way,
>>> that for each key you have a separate "map" that backs it. This is the
>>> important concept behind distributed stream processing, that allows you to
>>> parallelize the computation and still make sure, that all data for the same
>>> key end up in the same partition.
>>>
>>> Does this answer your question?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state
>>>
>>> Best,
>>> D.
>>>
>>


Re: Mapstate got wrong UK when restored.

2021-12-29 Thread Joshua Fan
Hi David,
Thanks for you reply.
Yes, for keyed state, every state is referenced by a particular key, but I
would guess it is a flink sdk issue, I mean,  the keyed state maybe saved
as (key,  keyed state), as for my situation, it is (key, mapstate(UK,UV)),
I think the key of this pair is not easy to get by user, when I do
mapstate.keyset I want to get the UK set, not the key set. According to my
job, the (key, mapstate(UK,UV)) can be get successfully when job is
running, but when job restarts from a checkpoint, the restored mapstate,
the pair seemed be changed to (key, UV), the UK just gone, I can not find
back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly
added when write or read from the state by flink.
So, I am still not clear why I get the key but not the UK.

Yours
Josh

David Morávek  于2021年12月29日周三 17:32写道:

> Hi Josh,
>
> it's important bit to understand is that the MapState (or any other keyed
> state) is scoped per *key* [1]. You can think about it in a way, that for
> each key you have a separate "map" that backs it. This is the important
> concept behind distributed stream processing, that allows you to
> parallelize the computation and still make sure, that all data for the same
> key end up in the same partition.
>
> Does this answer your question?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state
>
> Best,
> D.
>


Mapstate got wrong UK when restored.

2021-12-28 Thread Joshua Fan
Hi All
My flink version is 1.11, the statebackend is rocksdb, and I want to write
a flink job to implement an adaptive window. I wrote a flink dag like below:

> DataStream entities = env.addSource(new 
> EntitySource()).setParallelism(1);
>
> entities.keyBy(DataEntity::getName).process(new 
> EntityKeyedProcessFunction()).setParallelism(p);
>
> The important code is the EntityKeyedProcessFunction,  it is attached. I
have a mapstate in it like 'private transient MapState
entityStates;'
I print the content of the mapstate when checkpoint completed, the content
is ok like below:

> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] -
> >the key is 164067960
> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@b9266a0, window.
> DataEntity@682fce90]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@16d20045]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = dolphin, value = [window.DataEntity@1820c75a, window.
> DataEntity@64f3b9f6]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@2b2ad03]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@65f39671, window.
> DataEntity@2df6b2bf]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Bee, value = [window.DataEntity@13249998]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = cicada, value = [window.DataEntity@7266e125, window.
> DataEntity@167cf1ae]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@2596aa5a, window.
> DataEntity@603c0804]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Crane, value = [window.DataEntity@2a3192e9, window.
> DataEntity@3a65398f]
>
The key of the mapstate is a time which was coalesced to minute.

But when the job restarted from a checkpoint, the content of the mapstate
changed, actually, the key of the mapstate changed. It would show as below.

> 2021-12-28 16:15:45,379 INFO window.EntityKeyedProcessFunction [] -
> >the key is carp
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@510d4c4b, window.
> DataEntity@7857e387]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@31366a33, window.
> DataEntity@a62074a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@56db63fa, window.
> DataEntity@54befce0, window.DataEntity@4e7cf96a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@7ad09313, window.
> DataEntity@592a2955]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@48c05cae]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = duck, value = [window.DataEntity@3e9ef1a4]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Rhinoceros, value = [window.DataEntity@25f11701, window.
> DataEntity@2334b667]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Eagle, value = [window.DataEntity@7574eb4a]
>
It seems like the key of the restored mapstate is the key of the operator.
My minute time was gone, and it is replaced by the key of the operator.
It is so weird. Do I misuse the mapstate?
Thanks.

Yours
Josh


EntityKeyedProcessFunction.java
Description: Binary data


Re: custom flink image error

2021-08-06 Thread Joshua Fan
finally, I work out how to build a custom flink image, the Dockerfile just
as:
>
> FROM  flink:1.13.1-scala_2.11
> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins
> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins
>

the wrong Docker file is :

>  FROM  apache/flink:1.13.1-scala_2.11

ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins

ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins

It uses the wrong base image.
I don't know why apache/flink:1.13.1-scala_2.11 is so different from
flink:1.13.1-scala_2.11, I have no idea where the apache comes from. Hope
you all are doing it well.

Joshua Fan  于2021年8月5日周四 上午11:42写道:

> It seems I set a wrong  high-availability.storageDir,
> s3://flink-test/recovery can work, but  s3:///flink-test/recovery can not,
> one / be removed.
>
> Joshua Fan  于2021年8月5日周四 上午10:43写道:
>
>> Hi Robert, Tobias
>>
>> I have tried many ways to build and validate the image.
>>
>> 1.put the s3 dependency to plugin subdirectory, the Dockerfile content is
>> below:
>>
>>> FROM apache/flink:1.13.1-scala_2.11
>>> ADD ./flink-s3-fs-hadoop-1.13.1.jar
>>> /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar
>>> ADD ./flink-s3-fs-presto-1.13.1.jar
>>> /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar
>>>
>> This time the image can be run on  k8s but would also hit a error like
>> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>> find a file system implementation for scheme 's3'.", it seems like flink
>> can not find the s3 filesystem supports dynamically. When I want to run the
>> image using 'docker run -it ', it would also report
>> 'standard_init_linux.go:190: exec user process caused "exec format error"'.
>>
>> 2.put the s3 dependency to plugin directly, the Dockerfile content is
>> below:
>>
>>> FROM apache/flink:1.13.1-scala_2.11
>>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins
>>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins
>>>
>> The image can not run on the k8s and report error just the same  as  run
>> the image using 'docker run -it ',  'standard_init_linux.go:190: exec user
>> process caused "exec format error"'.
>>
>> 3.just run the community edition image flink:1.13.1-scala_2.11 locally as
>> docker run -it, it will also hit the same error
>> 'standard_init_linux.go:190: exec user process caused "exec format error"',
>> but the flink:1.13.1-scala_2.11 can be run on the k8s without s3
>> requirement.
>>
>> 4.import the s3 dependency as a kubernetes parameter
>> I submit the session with '
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar
>> \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=
>> flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report
>> error as below
>>
>>> Caused by: java.lang.NullPointerException: null uri host.
>>> at java.util.Objects.requireNonNull(Objects.java:228)
>>> at
>>> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
>>> at
>>> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123)
>>>
>> but I have set the s3 staff in the flink-conf.yaml as below:
>>
>>> high-availability:
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>
>> high-availability.storageDir: s3:///flink-test/recovery
>>
>> s3.endpoint: http://xxx.yyy.zzz.net
>>
>> s3.path.style.access: true
>>
>> s3.access-key: 111
>>
>> s3.secret-key: 222
>>
>> I think I supplied all the s3 information in the flink-conf.yaml, but it
>> did not work.
>>
>> I will try other ways to complete the s3 ha on k8s. Thank your guys.
>>
>> Yours sincerely
>> Joshua
>>
>> Robert Metzger  于2021年8月4日周三 下午11:35写道:
>>
>>> Hey Joshua,
>>>
>>> Can you first validate if the docker image you've built is valid by
>>> running it locally on your machine?
>>>
>>> I would recommend putting the s3 filesystem files into the plugins [1]
>>> directory to avoid classloading issues.
>>> Also, you don't need to build custom images if you want to use

Re: custom flink image error

2021-08-04 Thread Joshua Fan
It seems I set a wrong  high-availability.storageDir,
s3://flink-test/recovery can work, but  s3:///flink-test/recovery can not,
one / be removed.

Joshua Fan  于2021年8月5日周四 上午10:43写道:

> Hi Robert, Tobias
>
> I have tried many ways to build and validate the image.
>
> 1.put the s3 dependency to plugin subdirectory, the Dockerfile content is
> below:
>
>> FROM apache/flink:1.13.1-scala_2.11
>> ADD ./flink-s3-fs-hadoop-1.13.1.jar
>> /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar
>> ADD ./flink-s3-fs-presto-1.13.1.jar
>> /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar
>>
> This time the image can be run on  k8s but would also hit a error like
> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 's3'.", it seems like flink
> can not find the s3 filesystem supports dynamically. When I want to run the
> image using 'docker run -it ', it would also report
> 'standard_init_linux.go:190: exec user process caused "exec format error"'.
>
> 2.put the s3 dependency to plugin directly, the Dockerfile content is
> below:
>
>> FROM apache/flink:1.13.1-scala_2.11
>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins
>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins
>>
> The image can not run on the k8s and report error just the same  as  run
> the image using 'docker run -it ',  'standard_init_linux.go:190: exec user
> process caused "exec format error"'.
>
> 3.just run the community edition image flink:1.13.1-scala_2.11 locally as
> docker run -it, it will also hit the same error
> 'standard_init_linux.go:190: exec user process caused "exec format error"',
> but the flink:1.13.1-scala_2.11 can be run on the k8s without s3
> requirement.
>
> 4.import the s3 dependency as a kubernetes parameter
> I submit the session with '
> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar
> \
> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=
> flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report
> error as below
>
>> Caused by: java.lang.NullPointerException: null uri host.
>> at java.util.Objects.requireNonNull(Objects.java:228)
>> at
>> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
>> at
>> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123)
>>
> but I have set the s3 staff in the flink-conf.yaml as below:
>
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir: s3:///flink-test/recovery
>
> s3.endpoint: http://xxx.yyy.zzz.net
>
> s3.path.style.access: true
>
> s3.access-key: 111
>
> s3.secret-key: 222
>
> I think I supplied all the s3 information in the flink-conf.yaml, but it
> did not work.
>
> I will try other ways to complete the s3 ha on k8s. Thank your guys.
>
> Yours sincerely
> Joshua
>
> Robert Metzger  于2021年8月4日周三 下午11:35写道:
>
>> Hey Joshua,
>>
>> Can you first validate if the docker image you've built is valid by
>> running it locally on your machine?
>>
>> I would recommend putting the s3 filesystem files into the plugins [1]
>> directory to avoid classloading issues.
>> Also, you don't need to build custom images if you want to use build-in
>> plugins [2]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins
>>
>> On Wed, Aug 4, 2021 at 3:06 PM Joshua Fan  wrote:
>>
>>> Hi All
>>> I want to build a custom flink image to run on k8s, below is my
>>> Dockerfile content:
>>>
>>>> FROM apache/flink:1.13.1-scala_2.11
>>>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib
>>>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib
>>>>
>>> I just put the s3 fs dependency to the {flink home}/lib, and then I
>>> build the image and push it to the repo.
>>>
>>> When I submit the flink session from the custom image, a error will be
>>> reported like "exec /docker-entrypoint.sh failed: Exec format error".
>>>
>>> I googled a lot, but it seems no useful information.
>>>
>>> Thanks for your help.
>>>
>>> Yours sincerely
>>> Joshua
>>>
>>


Re: custom flink image error

2021-08-04 Thread Joshua Fan
Hi Robert, Tobias

I have tried many ways to build and validate the image.

1.put the s3 dependency to plugin subdirectory, the Dockerfile content is
below:

> FROM apache/flink:1.13.1-scala_2.11
> ADD ./flink-s3-fs-hadoop-1.13.1.jar
> /opt/flink/plugins/s3-hadoop/flink-s3-fs-hadoop-1.13.1.jar
> ADD ./flink-s3-fs-presto-1.13.1.jar
> /opt/flink/plugins/s3-presto/flink-s3-fs-presto-1.13.1.jar
>
This time the image can be run on  k8s but would also hit a error like
"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 's3'.", it seems like flink
can not find the s3 filesystem supports dynamically. When I want to run the
image using 'docker run -it ', it would also report
'standard_init_linux.go:190: exec user process caused "exec format error"'.

2.put the s3 dependency to plugin directly, the Dockerfile content is below:

> FROM apache/flink:1.13.1-scala_2.11
> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/plugins
> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/plugins
>
The image can not run on the k8s and report error just the same  as  run
the image using 'docker run -it ',  'standard_init_linux.go:190: exec user
process caused "exec format error"'.

3.just run the community edition image flink:1.13.1-scala_2.11 locally as
docker run -it, it will also hit the same error
'standard_init_linux.go:190: exec user process caused "exec format error"',
but the flink:1.13.1-scala_2.11 can be run on the k8s without s3
requirement.

4.import the s3 dependency as a kubernetes parameter
I submit the session with '
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.0.jar
\
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=
flink-s3-fs-hadoop-1.13.0.jar', the session can be start, but report error
as below

> Caused by: java.lang.NullPointerException: null uri host.
> at java.util.Objects.requireNonNull(Objects.java:228)
> at
> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:72)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123)
>
but I have set the s3 staff in the flink-conf.yaml as below:

> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: s3:///flink-test/recovery

s3.endpoint: http://xxx.yyy.zzz.net

s3.path.style.access: true

s3.access-key: 111

s3.secret-key: 222

I think I supplied all the s3 information in the flink-conf.yaml, but it
did not work.

I will try other ways to complete the s3 ha on k8s. Thank your guys.

Yours sincerely
Joshua

Robert Metzger  于2021年8月4日周三 下午11:35写道:

> Hey Joshua,
>
> Can you first validate if the docker image you've built is valid by
> running it locally on your machine?
>
> I would recommend putting the s3 filesystem files into the plugins [1]
> directory to avoid classloading issues.
> Also, you don't need to build custom images if you want to use build-in
> plugins [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins
>
> On Wed, Aug 4, 2021 at 3:06 PM Joshua Fan  wrote:
>
>> Hi All
>> I want to build a custom flink image to run on k8s, below is my
>> Dockerfile content:
>>
>>> FROM apache/flink:1.13.1-scala_2.11
>>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib
>>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib
>>>
>> I just put the s3 fs dependency to the {flink home}/lib, and then I build
>> the image and push it to the repo.
>>
>> When I submit the flink session from the custom image, a error will be
>> reported like "exec /docker-entrypoint.sh failed: Exec format error".
>>
>> I googled a lot, but it seems no useful information.
>>
>> Thanks for your help.
>>
>> Yours sincerely
>> Joshua
>>
>


custom flink image error

2021-08-04 Thread Joshua Fan
Hi All
I want to build a custom flink image to run on k8s, below is my Dockerfile
content:

> FROM apache/flink:1.13.1-scala_2.11
> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib
> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib
>
I just put the s3 fs dependency to the {flink home}/lib, and then I build
the image and push it to the repo.

When I submit the flink session from the custom image, a error will be
reported like "exec /docker-entrypoint.sh failed: Exec format error".

I googled a lot, but it seems no useful information.

Thanks for your help.

Yours sincerely
Joshua


Re: SIGSEGV error

2021-05-18 Thread Joshua Fan
Hi Till,
I also tried the job without gzip, it came into the same error.
But the problem is solved now. I was about to give up to solve it, I found
the mail at
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-crash-SIGSEGV-in-ZIP-GetEntry-td17326.html.
So I think maybe it was something about the serialize staff.
What I have done is :
before:

OperatorStateStore stateStore = context.getOperatorStateStore();
ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",State.class);

after:

OperatorStateStore stateStore = context.getOperatorStateStore();
ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",new
JavaSerializer());

Hope this is helpful.

Yours sincerely
Josh



Till Rohrmann  于2021年5月18日周二 下午2:54写道:

> Hi Joshua,
>
> could you try whether the job also fails when not using the gzip format?
> This could help us narrow down the culprit. Moreover, you could try to run
> your job and Flink with Java 11 now.
>
> Cheers,
> Till
>
> On Tue, May 18, 2021 at 5:10 AM Joshua Fan  wrote:
>
>> Hi all,
>>
>> Most of the posts says that "Most of the times, the crashes in
>> ZIP_GetEntry occur when the jar file being accessed has been
>> modified/overwritten while the JVM instance was running. ", but do not
>> know when and which jar file was modified according to the job running in
>> flink.
>>
>> for your information.
>>
>> Yours sincerely
>> Josh
>>
>> Joshua Fan  于2021年5月18日周二 上午10:15写道:
>>
>>> Hi Stephan, Till
>>>
>>> Recently, I tried to upgrade a flink job from 1.7 to 1.11,
>>> unfortunately, the weird problem appeared, " SIGSEGV (0xb) at
>>> pc=0x0025, pid=135306, tid=140439001388800".  The pid log is
>>> attached.
>>> Actually, it is a simple job that consumes messages from kafka and
>>> writes into hdfs with a gzip format. It can run in 1.11 for about 2
>>> minutes, then the JVM will crash, then job restart and jvm crash again
>>> until the application fails.
>>> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns
>>> out helpless, the same crash keeps happening. Google suggests to upgrade
>>> jdk to jdk1.9, but it is not feasible.
>>> Any suggestions? Thanks a lot.
>>>
>>> Yours sincerely
>>> Josh
>>>
>>> Stephan Ewen  于2019年9月13日周五 下午11:11写道:
>>>
>>>> Given that the segfault happens in the JVM's ZIP stream code, I am
>>>> curious is this is a bug in Flink or in the JVM core libs, that happens to
>>>> be triggered now by newer versions of FLink.
>>>>
>>>> I found this on StackOverflow, which looks like it could be related:
>>>> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
>>>> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"?
>>>>
>>>>
>>>> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Marek,
>>>>>
>>>>> could you share the logs statements which happened before the SIGSEGV
>>>>> with us? They might be helpful to understand what happened before.
>>>>> Moreover, it would be helpful to get access to your custom serializer
>>>>> implementations. I'm also pulling in Gordon who worked on
>>>>> the TypeSerializerSnapshot improvements.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>>>>>> upgrade our task managers started to fail with SIGSEGV error from time to
>>>>>> time.
>>>>>>
>>>>>> In process of adjusting the code to 1.8.1, we noticed that there were
>>>>>> some changes around TypeSerializerSnapshot interface and its
>>>>>> implementations. At that time we had a few custom serializers which we
>>>>>> decided to throw out during migration and then leverage flink default
>>>>>> serializers. We don't mind clearing the state in the process of 
>>>>>> migration,
>>>>>> an effort to migrate with state seems to be not worth it.
>>>>>>
>>>>>> Unfortunately after running new version we see SIGSEGV errors from
>>>>>> time to time. It 

Re: SIGSEGV error

2021-05-17 Thread Joshua Fan
Hi all,

Most of the posts says that "Most of the times, the crashes in ZIP_GetEntry
occur when the jar file being accessed has been modified/overwritten while
the JVM instance was running. ", but do not know when and which jar file
was modified according to the job running in flink.

for your information.

Yours sincerely
Josh

Joshua Fan  于2021年5月18日周二 上午10:15写道:

> Hi Stephan, Till
>
> Recently, I tried to upgrade a flink job from 1.7 to 1.11, unfortunately,
> the weird problem appeared, " SIGSEGV (0xb) at pc=0x0025,
> pid=135306, tid=140439001388800".  The pid log is attached.
> Actually, it is a simple job that consumes messages from kafka and writes
> into hdfs with a gzip format. It can run in 1.11 for about 2 minutes, then
> the JVM will crash, then job restart and jvm crash again until the
> application fails.
> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns out
> helpless, the same crash keeps happening. Google suggests to upgrade jdk to
> jdk1.9, but it is not feasible.
> Any suggestions? Thanks a lot.
>
> Yours sincerely
> Josh
>
> Stephan Ewen  于2019年9月13日周五 下午11:11写道:
>
>> Given that the segfault happens in the JVM's ZIP stream code, I am
>> curious is this is a bug in Flink or in the JVM core libs, that happens to
>> be triggered now by newer versions of FLink.
>>
>> I found this on StackOverflow, which looks like it could be related:
>> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
>> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"?
>>
>>
>> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Marek,
>>>
>>> could you share the logs statements which happened before the SIGSEGV
>>> with us? They might be helpful to understand what happened before.
>>> Moreover, it would be helpful to get access to your custom serializer
>>> implementations. I'm also pulling in Gordon who worked on
>>> the TypeSerializerSnapshot improvements.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>>>> upgrade our task managers started to fail with SIGSEGV error from time to
>>>> time.
>>>>
>>>> In process of adjusting the code to 1.8.1, we noticed that there were
>>>> some changes around TypeSerializerSnapshot interface and its
>>>> implementations. At that time we had a few custom serializers which we
>>>> decided to throw out during migration and then leverage flink default
>>>> serializers. We don't mind clearing the state in the process of migration,
>>>> an effort to migrate with state seems to be not worth it.
>>>>
>>>> Unfortunately after running new version we see SIGSEGV errors from time
>>>> to time. It may be that serialization is not the real cause, but at the
>>>> moment it seems to be the most probable reason. We have not performed any
>>>> significant code changes besides serialization area.
>>>>
>>>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
>>>> Checkpoint configuration: RocksDB backend, not incremental, 50s min
>>>> processing time
>>>>
>>>> You can find parts of JobManager log and ErrorFile log of failed
>>>> container included below.
>>>>
>>>> Any suggestions are welcome
>>>>
>>>> Best regards
>>>> Marek Maj
>>>>
>>>> jobmanager.log
>>>>
>>>> 019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   -
>>>> Completed checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965
>>>> (18532488122 bytes in 60871 ms).
>>>>
>>>> 2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
>>>> Triggering checkpoint 48 @ 1568111478177 for job
>>>> c8a9ae03785ade86348c3189cf7dd965.
>>>>
>>>> 2019-09-10 16:32:19.280 INFO  o.a.f.r.c.CheckpointCoordinator   -
>>>> Completed checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965
>>>> (19049515705 bytes in 61083 ms).
>>>>
>>>> 2019-09-10 16:33:10.480 INFO  o.a.f.r.c.CheckpointCoordinator   -
>>>> Triggering checkpoint 49 @ 1568111589279 for job
>>>> c8a9ae03785ade86348c3189cf7dd965.
>>>>
>>>> 2019-09-10 16:33:36.773 WARN  o.a.f.r.r.h.l.m

Problems about pv uv in flink sql

2021-01-18 Thread Joshua Fan
Hi

I have learned from the community on how to do pv/uv in flink sql. One is
to make a MMdd grouping, the other is to make a day window. Thank you
all.

I have a question about the result output. For MMdd grouping, every
minute the database would get a record, and many records would be in the
database as time goes on, but there would be only a few records in the
database according to the day window.

for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
according to the MMdd grouping solution, for the day window solution,
there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.

I wonder, for the day window solution, is it possible to have the same
result output as the MMdd solution? because the day window solution has
no worry about the state retention.

Thanks.

Yours sincerely

Josh


Re: Flink streaming sql是否支持两层group by聚合

2021-01-13 Thread Joshua Fan
Hi Jark and Benchao,

There are three more weird things about the pv uv in Flink SQL.

As I described in the above email, I computed the pv uv in two method, I
list them below:

For the day grouping one, the sql is

> insert into pvuv_sink
> select a,v,MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) dt,
>   COUNT(m2) AS pv,
>   COUNT(DISTINCT m2) AS uv from kafkaTable GROUP BY DATE_FORMAT(ts,
> '-MM-dd'),a,v;
>
And the result of one dimension is
[image: result_day_grouping.png]
For the 1 day window one, the sql is

>  insert into pvuv_sink

select a,v,MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) dt,

  COUNT(m2) AS pv,

  COUNT(DISTINCT m2) AS uv from kafkaTable GROUP BY tumble(ts, interval '1'
> day),a,v;

And the result of one dimension is
[image: result_1day_window.png]
Here are the three questions:
1. According to the same cpu and memory and parallelism, but the day
grouping solution is faster than the 1 day window solution, the day
grouping solution cost 1 hour to consume all the data,
but the 1 day window solution cost 4 hours to consume all the data.

2. The final result is not the same, the pv/uv of the day grouping is
7304086/7299878, but the pv/uv of the 1 day window is 7304352/7300144, I
think both of the result is not accurate, but approximate?
So, how about the loss of accuracy? What is the algorithm below the count
distinct?

3. As the picture of the 1 day window shows, there are many records of the
a=1, v=12.0.6.1, dt=2021-01-13 17:45:00, but in my last mail, I noticed the
records changed always when the job begin to execute, and
one record per dimension, now on the final time, it popped up so many
records per dimension, it's weird.

Any advice will be fully appreciated.

Yours sincerely

Josh

On Wed, Jan 13, 2021 at 7:24 PM Joshua Fan  wrote:

> Hi Jark and Benchao
>
> I have learned from your previous email  on how to do pv/uv in flink sql.
> One is to make a MMdd grouping, the other is to make a day window.
> Thank you all.
>
> I have a question about the result output. For MMdd grouping, every
> minute the database would get a record, and many records would be in the
> database as time goes on, but there would be only a few records in the
> database according to the day window.
>
> for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
> according to the MMdd grouping solution, for the day window solution,
> there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.
>
> I wonder, for the day window solution, is it possible to have the same
> result output as the MMdd solution? because the day window solution has
> no worry about the state retention.
>
> Thanks.
>
> Yours sincerely
>
> Josh
>
> On Sat, Apr 18, 2020 at 9:38 PM Jark Wu  wrote:
>
>> Hi,
>>
>> I will use English because we are also sending to user@ ML.
>>
>> This behavior is as expected, not a bug. Benchao gave a good explanation
>> about the reason. I will give some further explanation.
>> In Flink SQL, we will split an update operation (such as uv from 100 ->
>> 101) into two separate messages, one is -[key, 100], the other is +[key,
>> 101].
>> Once these two messages arrive the downstream aggregation, it will also
>> send two result messages (assuming the previous SUM(uv) is 500),
>> one is [key, 400], the other is [key, 501].
>>
>> But this problem is almost addressed since 1.9, if you enabled the
>> mini-batch optimization [1]. Because mini-batch optimization will try best
>> to the
>> accumulate the separate + and - message in a single mini-batch
>> processing. You can upgrade and have a try.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>
>>
>>
>> On Sat, 18 Apr 2020 at 12:26, Benchao Li  wrote:
>>
>>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。
>>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。
>>>
>>> dixingxing85 于2020年4月18日 周六上午11:38写道:
>>>
>>>> 多谢benchao,
>>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
>>>> 20200417,86
>>>> 20200417,90
>>>> 20200417,130
>>>> 20200417,131
>>>>
>>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
>>>> 20200417,90
>>>> 20200417,86
>>>> 20200417,130
>>>> 20200417,86
>>>> 20200417,131
>>>>
>>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
>>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Apr 18, 2020, at 10:08, Benchao 

Re: Flink streaming sql是否支持两层group by聚合

2021-01-13 Thread Joshua Fan
Hi Jark and Benchao

I have learned from your previous email  on how to do pv/uv in flink sql.
One is to make a MMdd grouping, the other is to make a day window.
Thank you all.

I have a question about the result output. For MMdd grouping, every
minute the database would get a record, and many records would be in the
database as time goes on, but there would be only a few records in the
database according to the day window.

for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
according to the MMdd grouping solution, for the day window solution,
there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.

I wonder, for the day window solution, is it possible to have the same
result output as the MMdd solution? because the day window solution has
no worry about the state retention.

Thanks.

Yours sincerely

Josh

On Sat, Apr 18, 2020 at 9:38 PM Jark Wu  wrote:

> Hi,
>
> I will use English because we are also sending to user@ ML.
>
> This behavior is as expected, not a bug. Benchao gave a good explanation
> about the reason. I will give some further explanation.
> In Flink SQL, we will split an update operation (such as uv from 100 ->
> 101) into two separate messages, one is -[key, 100], the other is +[key,
> 101].
> Once these two messages arrive the downstream aggregation, it will also
> send two result messages (assuming the previous SUM(uv) is 500),
> one is [key, 400], the other is [key, 501].
>
> But this problem is almost addressed since 1.9, if you enabled the
> mini-batch optimization [1]. Because mini-batch optimization will try best
> to the
> accumulate the separate + and - message in a single mini-batch processing.
> You can upgrade and have a try.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
>
>
> On Sat, 18 Apr 2020 at 12:26, Benchao Li  wrote:
>
>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。
>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。
>>
>> dixingxing85 于2020年4月18日 周六上午11:38写道:
>>
>>> 多谢benchao,
>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
>>> 20200417,86
>>> 20200417,90
>>> 20200417,130
>>> 20200417,131
>>>
>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
>>> 20200417,90
>>> 20200417,86
>>> 20200417,130
>>> 20200417,86
>>> 20200417,131
>>>
>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>>>
>>> Sent from my iPhone
>>>
>>> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
>>>
>>> 
>>>
>>> Hi,
>>>
>>> 这个是支持的哈。
>>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
>>> 如果是两层的话,就成了:
>>> 第一层-[old], 第二层-[cur], +[old]
>>> 第一层+[new], 第二层[-old], +[new]
>>>
>>> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>>>

 Hi all:

 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
 或者flink还不支持这种sql*。
 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A
 -> dt,  B -> pvareaid)

 SELECT dt, SUM(a.uv) AS uv
 FROM (
SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
FROM streaming_log_event
WHERE action IN ('action1')
   AND pvareaid NOT IN ('pv1', 'pv2')
   AND pvareaid IS NOT NULL
GROUP BY dt, pvareaid
 ) a
 GROUP BY dt;

 sink接收到的数据对应日志为:

 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,86,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,131,20200417)


 我们使用的是1.7.2, 测试作业的并行度为1。
 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228


 --
 dixingxin...@163.com


>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


Re: Flink streaming sql是否支持两层group by聚合

2021-01-13 Thread Joshua Fan
Hi Jark and Benchao

I have learned from your previous email  on how to do pv/uv in flink sql.
One is to make a MMdd grouping, the other is to make a day window.
Thank you all.

I have a question about the result output. For MMdd grouping, every
minute the database would get a record, and many records would be in the
database as time goes on, but there would be only a few records in the
database according to the day window.

for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
according to the MMdd grouping solution, for the day window solution,
there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.

I wonder, for the day window solution, is it possible to have the same
result output as the MMdd solution? because the day window solution has
no worry about the state retention.

Thanks.

Yours sincerely

Josh

On Sat, Apr 18, 2020 at 9:38 PM Jark Wu  wrote:

> Hi,
>
> I will use English because we are also sending to user@ ML.
>
> This behavior is as expected, not a bug. Benchao gave a good explanation
> about the reason. I will give some further explanation.
> In Flink SQL, we will split an update operation (such as uv from 100 ->
> 101) into two separate messages, one is -[key, 100], the other is +[key,
> 101].
> Once these two messages arrive the downstream aggregation, it will also
> send two result messages (assuming the previous SUM(uv) is 500),
> one is [key, 400], the other is [key, 501].
>
> But this problem is almost addressed since 1.9, if you enabled the
> mini-batch optimization [1]. Because mini-batch optimization will try best
> to the
> accumulate the separate + and - message in a single mini-batch processing.
> You can upgrade and have a try.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
>
>
> On Sat, 18 Apr 2020 at 12:26, Benchao Li  wrote:
>
>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。
>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。
>>
>> dixingxing85 于2020年4月18日 周六上午11:38写道:
>>
>>> 多谢benchao,
>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
>>> 20200417,86
>>> 20200417,90
>>> 20200417,130
>>> 20200417,131
>>>
>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
>>> 20200417,90
>>> 20200417,86
>>> 20200417,130
>>> 20200417,86
>>> 20200417,131
>>>
>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>>>
>>> Sent from my iPhone
>>>
>>> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
>>>
>>> 
>>>
>>> Hi,
>>>
>>> 这个是支持的哈。
>>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
>>> 如果是两层的话,就成了:
>>> 第一层-[old], 第二层-[cur], +[old]
>>> 第一层+[new], 第二层[-old], +[new]
>>>
>>> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>>>

 Hi all:

 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
 或者flink还不支持这种sql*。
 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A
 -> dt,  B -> pvareaid)

 SELECT dt, SUM(a.uv) AS uv
 FROM (
SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
FROM streaming_log_event
WHERE action IN ('action1')
   AND pvareaid NOT IN ('pv1', 'pv2')
   AND pvareaid IS NOT NULL
GROUP BY dt, pvareaid
 ) a
 GROUP BY dt;

 sink接收到的数据对应日志为:

 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,86,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,131,20200417)


 我们使用的是1.7.2, 测试作业的并行度为1。
 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228


 --
 dixingxin...@163.com


>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


Does flink have a plan to support flink sql udf of any language?

2020-12-17 Thread Joshua Fan
Hi,

Does the flink community have a plan to support flink sql udf in any
language? For example, a udf in c or php. Because in my company, many
developers do not know java or scala, they use c in their usual work.

Now we have a workaround to support this situation by creating a process
running the c logic, and the process communicates with flink sql by a
wrapper udf function.

So, here I have two questions:
1.  Does the flink community have a plan to support flink sql udf in any
language?
2. If the answer to question 1 is no, is there any possibility to push our
solution to flink? Is this udf process staff  in line with the ideas of the
community aboud flink sql?

Thanks
Yours Sincerely
Josh


Re: why not flink delete the checkpoint directory recursively?

2020-11-25 Thread Joshua Fan
Hi Roman and Robert,

Thank you.
I have checked the code and the checkpoint deleting failure case. Yes,
Flink will delete the meta file and operator state file at first, then
delete the checkpoint dir which is truly an empty dir. The root cause of
the failure of deleting checkpoint is the hadoop delete will check the
directory and recursive parameter. I will work with people who in charge of
the hdfs to solve this problem.
Thanks again.

Yours sincerely
Josh

On Tue, Nov 17, 2020 at 6:36 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> I think Robert is right, state handles are deleted first, and then the
> directory is deleted non-recursively.
> If any exception occurs while removing the files, it will be combined with
> the other exception (as suppressed).
> So probably Flink failed to delete some files and then directory removal
> failed because of that.
> Can you share the full exception to check this?
> And probably check what files exist there as Robert suggested.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 10:38 AM Joshua Fan 
> wrote:
>
>> Hi Robert,
>>
>> When the `delete(Path f, boolean recursive)` recursive is false, hdfs
>> will throw exception like below:
>> [image: checkpoint-exception.png]
>>
>> Yours sincerely
>> Josh
>>
>> On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger 
>> wrote:
>>
>>> Hey Josh,
>>>
>>> As far as I understand the code CompletedCheckpoint.discard(), Flink is
>>> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
>>> deleting the directory.
>>>
>>> Which files are left over in your case?
>>> Do you see any exceptions on the TaskManagers?
>>>
>>> Best,
>>> Robert
>>>
>>> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan 
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> When a checkpoint should be deleted,
>>>> FsCompletedCheckpointStorageLocation.disposeStorageLocation will be
>>>> called.
>>>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
>>>> action. I wonder why the recursive parameter is set to false? as the
>>>> exclusiveCheckpointDir is truly a directory. in our hadoop, this
>>>> causes the checkpoint cannot be removed.
>>>> It is easy to change the recursive parameter to true, but is there any
>>>> potential harm?
>>>>
>>>> Yours sincerely
>>>> Josh
>>>>
>>>>


Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Joshua Fan
Hi Robert,

When the `delete(Path f, boolean recursive)` recursive is false, hdfs will
throw exception like below:
[image: checkpoint-exception.png]

Yours sincerely
Josh

On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger  wrote:

> Hey Josh,
>
> As far as I understand the code CompletedCheckpoint.discard(), Flink is
> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
> deleting the directory.
>
> Which files are left over in your case?
> Do you see any exceptions on the TaskManagers?
>
> Best,
> Robert
>
> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan 
> wrote:
>
>> Hi
>>
>> When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation
>> .disposeStorageLocation will be called.
>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
>> action. I wonder why the recursive parameter is set to false? as the
>> exclusiveCheckpointDir is truly a directory. in our hadoop, this causes
>> the checkpoint cannot be removed.
>> It is easy to change the recursive parameter to true, but is there any
>> potential harm?
>>
>> Yours sincerely
>> Josh
>>
>>


why not flink delete the checkpoint directory recursively?

2020-11-11 Thread Joshua Fan
Hi

When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation.
disposeStorageLocation will be called.
Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
action. I wonder why the recursive parameter is set to false? as the
exclusiveCheckpointDir is truly a directory. in our hadoop, this causes the
checkpoint cannot be removed.
It is easy to change the recursive parameter to true, but is there any
potential harm?

Yours sincerely
Josh


The rpc invocation size exceeds the maximum akka framesize when the job was re submitted.

2020-08-19 Thread Joshua Fan
hi,

We have a flink job platform which will resubmit the job when the job
failed without platform user involvement. Today a resubmit failed because
of the error below, I changed the akka.Frameszie, and the resubmit succeed.
My question is, there is nothing change to the job, the jar, the program,
or the arguments, why the error suddenly happened?

java.io.IOException: The rpc invocation size exceeds the maximum akka framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:247)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:196)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
at com.sun.proxy.$Proxy28.submitTask(Unknown Source)
at 
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:99)
at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:614)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:970)
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:542)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:774)
at akka.dispatch.OnComplete.internal(Future.scala:259)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:19)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:434)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:433)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thanks

Joshua


Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Sorry to forget the version, we run flink 1.7 on yarn in a ha mode.

On Fri, Oct 11, 2019 at 12:02 PM Joshua Fan  wrote:

> Hi Till
>
> After got your advice, I checked the log again. It seems not wholely the
> same as the condition you mentioned.
>
> I would like to summarize the story in the belowed log.
>
> Once a time, the zk connection  was not stable, so there happened 3 times
> suspended-reconnected.
>
> After the first suspended-reconnected, the Minidispatcher tried to recover
> all jobs.
>
> Then the second suspended-reconnected came, after this reconnected, there
> happened a 'The heartbeat of JobManager with id
> dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the
> Minidispatcher didn't try to recover the job.
>
> Due to the zk connection did not recover, the third suspended-reconnected
> came, after the zk reconnected, the Minidispatcher did not try to recover
> job ,but just repeated throw FencingTokenException, the AM was hanging, our
> alarm-system just
> found the job was gone, but can not get a final state of the job. And the
> FencingTokenException was ongoing for nearly one day long before we killed
> the AM.
>
> the whole log is attached.
>
> Thanks
>
> Joshua
>
> On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce 
> wrote:
>
>> Hi Till and Fabian,
>>
>>
>>
>> My apologies for taking a week to reply; it took some time to reproduce
>> the issue with debug logging. I’ve attached logs from a two minute period
>> when the problem happened. I’m just sending this to you two to avoid
>> sending the log file all over the place. If you’d like to have our
>> conversation in the user group mailing list, that’s fine.
>>
>>
>>
>> The job was submitted by using the job manager REST api starting at
>> 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the
>> job started running. We then run a monitor that polls the /overview
>> endpoint of the JM REST api. This started polling at 20:34:31.380 and
>> resulted in the JM throwing the FencingTokenException at 20:34:31:393, and
>> the JM returned a 500 to our monitor. This will happen every time we poll
>> until the monitor times out and then we tear down the cluster, even though
>> the job is running, we can’t tell that it is. This is somewhat rare,
>> happening maybe 5% of the time.
>>
>>
>>
>> We’re running Flink 1.7.1. This issue only happens when we run in Job
>> Manager High Availability mode. We provision two Job Managers, a 3-node
>> zookeeper cluster, task managers and our monitor all in their own
>> Kubernetes namespace. I can send you Zookeeper logs too if that would be
>> helpful.
>>
>>
>>
>> Thanks in advance for any help you can provide!
>>
>>
>>
>> -Bruce
>>
>> --
>>
>>
>>
>>
>>
>> *From: *Till Rohrmann 
>> *Date: *Wednesday, October 2, 2019 at 6:10 AM
>> *To: *Fabian Hueske 
>> *Cc: *"Hanson, Bruce" , "user@flink.apache.org" <
>> user@flink.apache.org>
>> *Subject: *Re: Fencing token exceptions from Job Manager High
>> Availability mode
>>
>>
>>
>> Hi Bruce, are you able to provide us with the full debug logs? From the
>> excerpt itself it is hard to tell what is going on.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske  wrote:
>>
>> Hi Bruce,
>>
>>
>>
>> I haven't seen such an exception yet, but maybe Till (in CC) can help.
>>
>>
>>
>> Best,
>>
>> Fabian
>>
>>
>>
>> Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <
>> bruce.han...@here.com>:
>>
>> Hi all,
>>
>>
>>
>> We are running some of our Flink jobs with Job Manager High Availability.
>> Occasionally we get a cluster that comes up improperly and doesn’t respond.
>> Attempts to submit the job seem to hang and when we hit the /overview REST
>> endpoint in the Job Manager we get a 500 error and a fencing token
>> exception like this:
>>
>>
>>
>> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428]
>> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  -
>> Implementation error: Unhandled exception.*
>>
>> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
>> token not set: Ignoring message LocalFencedMessage(null,
>> LocalRpcInvocation(requestResourceOverview(Time))) sent to
&

Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Hi Till

After got your advice, I checked the log again. It seems not wholely the
same as the condition you mentioned.

I would like to summarize the story in the belowed log.

Once a time, the zk connection  was not stable, so there happened 3 times
suspended-reconnected.

After the first suspended-reconnected, the Minidispatcher tried to recover
all jobs.

Then the second suspended-reconnected came, after this reconnected, there
happened a 'The heartbeat of JobManager with id
dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the
Minidispatcher didn't try to recover the job.

Due to the zk connection did not recover, the third suspended-reconnected
came, after the zk reconnected, the Minidispatcher did not try to recover
job ,but just repeated throw FencingTokenException, the AM was hanging, our
alarm-system just
found the job was gone, but can not get a final state of the job. And the
FencingTokenException was ongoing for nearly one day long before we killed
the AM.

the whole log is attached.

Thanks

Joshua

On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce 
wrote:

> Hi Till and Fabian,
>
>
>
> My apologies for taking a week to reply; it took some time to reproduce
> the issue with debug logging. I’ve attached logs from a two minute period
> when the problem happened. I’m just sending this to you two to avoid
> sending the log file all over the place. If you’d like to have our
> conversation in the user group mailing list, that’s fine.
>
>
>
> The job was submitted by using the job manager REST api starting at
> 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the
> job started running. We then run a monitor that polls the /overview
> endpoint of the JM REST api. This started polling at 20:34:31.380 and
> resulted in the JM throwing the FencingTokenException at 20:34:31:393, and
> the JM returned a 500 to our monitor. This will happen every time we poll
> until the monitor times out and then we tear down the cluster, even though
> the job is running, we can’t tell that it is. This is somewhat rare,
> happening maybe 5% of the time.
>
>
>
> We’re running Flink 1.7.1. This issue only happens when we run in Job
> Manager High Availability mode. We provision two Job Managers, a 3-node
> zookeeper cluster, task managers and our monitor all in their own
> Kubernetes namespace. I can send you Zookeeper logs too if that would be
> helpful.
>
>
>
> Thanks in advance for any help you can provide!
>
>
>
> -Bruce
>
> --
>
>
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, October 2, 2019 at 6:10 AM
> *To: *Fabian Hueske 
> *Cc: *"Hanson, Bruce" , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Fencing token exceptions from Job Manager High
> Availability mode
>
>
>
> Hi Bruce, are you able to provide us with the full debug logs? From the
> excerpt itself it is hard to tell what is going on.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske  wrote:
>
> Hi Bruce,
>
>
>
> I haven't seen such an exception yet, but maybe Till (in CC) can help.
>
>
>
> Best,
>
> Fabian
>
>
>
> Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <
> bruce.han...@here.com>:
>
> Hi all,
>
>
>
> We are running some of our Flink jobs with Job Manager High Availability.
> Occasionally we get a cluster that comes up improperly and doesn’t respond.
> Attempts to submit the job seem to hang and when we hit the /overview REST
> endpoint in the Job Manager we get a 500 error and a fencing token
> exception like this:
>
>
>
> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428]
> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  -
> Implementation error: Unhandled exception.*
>
> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message LocalFencedMessage(null,
> LocalRpcInvocation(requestResourceOverview(Time))) sent to
> akka.tcp://fl...@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager
> because the fencing token is null.*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)*
>
> *at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)*
>
> *at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)*
>
> *at akka.actor.Actor$class.aroundReceive(Actor.scala:502)*
>
> *at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)*
>
> *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)*
>
> *at akka.actor.ActorCell.invoke(ActorCell.scala:495)*
>
> *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)*
>
> *at 

Re: Difference between data stream window function and cep within

2019-09-18 Thread Joshua Fan
Hi Dian

Thank you for your explanation.
After have a look at the source code, the cep within just executes by a
time interval according to each state.
Thank you.

Yours sincerely
Joshua

On Wed, Sep 18, 2019 at 9:41 AM Dian Fu  wrote:

> Hi Joshua,
>
> There is no tumbling/sliding window underlying the cep within
> implementation.
>
> The difference between datastream window and cep within is that:
> 1) Regarding to datastream window, the window is unified for all the
> elements (You can think that the window already exists before the input
> elements come). For example, for sliding window: (window size: 60s, slide
> size: 10s), then the windows will be [0s, 60s], [10s, 70s], [20s, 80s],
> etc. When the input elements come, they are put into the windows they
> belong to.
> 2) Regarding to cep within, it defines the maximum time interval for an
> event sequence to match the pattern. So a unified window is not suitable
> for this requirement. Regarding to the underlying implementation, for each
> matching/partial-matching sequence, the time interval between the first
> element and the last element of the sequence will be checked against the
> within interval. You can refer to [1] for details.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/459fd929399ad6c80535255eefa278564ec33683/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java#L251
>
>
> 在 2019年9月17日,下午7:47,Joshua Fan  写道:
>
> Hi All,
>
> I'd like to know the difference between data stream window function and
> cep within, I googled this issue but found no useful information.
>
> Below the cep within, is there a tumbling window or sliding window or just
> a process function?
>
> Your explanation will be truly appreciated.
>
> Yours sincerely
>
> Joshua
>
>
>


Difference between data stream window function and cep within

2019-09-17 Thread Joshua Fan
Hi All,

I'd like to know the difference between data stream window function and cep
within, I googled this issue but found no useful information.

Below the cep within, is there a tumbling window or sliding window or just
a process function?

Your explanation will be truly appreciated.

Yours sincerely

Joshua


Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-24 Thread Joshua Fan
Hi Zhijiang

Thank you for your analysis. I agree with it. The solution may be to let tm
exit like you mentioned when any type of oom occurs, because the flink has
no control on a tm when a oom occurs.

I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.

Don't know it is worth to fix.

Thank you all.

Yours sincerely
Joshua

On Fri, Jun 21, 2019 at 5:32 PM zhijiang  wrote:

> Thanks for the reminding @Chesnay Schepler .
>
> I just looked throught the related logs. Actually all the five
> "Source: ServiceLog" tasks are not in terminal state on JM view, the
> relevant processes are as follows:
>
> 1. The checkpoint in task causes OOM issue which would call
> `Task#failExternally` as a result, we could see the log "Attempting to
> fail task externally" in tm.
> 2. The source task would transform state from RUNNING to FAILED and then
> starts a canceler thread for canceling task, we could see log "Triggering
> cancellation of task" in tm.
> 3. When JM starts to cancel the source tasks, the rpc call
> `Task#cancelExecution` would find the task was already in FAILED state as
> above step 2, we could see log "Attempting to cancel task" in tm.
>
> At last all the five source tasks are not in terminal states from jm log,
> I guess the step 2 might not create canceler thread successfully, because
> the root failover was caused by OOM during creating native thread in step1,
> so it might exist possibilities that createing canceler thread is not
> successful as well in OOM case which is unstable. If so, the source task
> would not been interrupted at all, then it would not report to JM as well,
> but the state is already changed to FAILED before.
>
> For the other vertex tasks, it does not trigger `Task#failExternally` in
> step 1, and only receives the cancel rpc from JM in step 3. And I guess at
> this time later than the source period, the canceler thread could be
> created succesfully after some GCs, then these tasks could be canceled as
> reported to JM side.
>
> I think the key problem is under OOM case some behaviors are not within
> expectations, so it might bring problems. Maybe we should handle OOM error
> in extreme way like making TM exit to solve the potential issue.
>
> Best,
> Zhijiang
>
> --
> From:Chesnay Schepler 
> Send Time:2019年6月21日(星期五) 16:34
> To:zhijiang ; Joshua Fan <
> joshuafat...@gmail.com>
> Cc:user ; Till Rohrmann 
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> The logs are attached to the initial mail.
>
> Echoing my thoughts from earlier: from the logs it looks as if the TM
> never even submits the terminal state RPC calls for several tasks to the JM.
>
> On 21/06/2019 10:30, zhijiang wrote:
> Hi Joshua,
>
> If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really
> in CANCELED state on TM side, but in CANCELING state on JM side, then it
> might indicates the terminal state RPC was not received by JM. I am not
> sure whether the OOM would cause this issue happen resulting in unexpected
> behavior.
>
> In addition, you mentioned these tasks are still active after OOM and was
> called to cancel, so I am not sure what is the specific periods for your
> attached TM stack. I think it might provide help if you could provide
> corresponding TM log and JM log.
> From TM log it is easy to check the task final state.
>
> Best,
> Zhijiang
> --
> From:Joshua Fan  
> Send Time:2019年6月20日(星期四) 11:55
> To:zhijiang  
> Cc:user  ; Till Rohrmann
>  ; Chesnay Schepler
>  
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> zhijiang
>
> I did not capture the job ui, the topology is in FAILING state, but the
> persistentbolt subtasks as can be seen in the picture attached in first
> mail was all canceled, and the parsebolt subtasks as described before had
> one subtask FAILED, other subtasks CANCELED, but the source subtasks had
> one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask
> 2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.
>
> The subtask status described above is in jm view, but in tm view, all of
> the source subtask was in FAILED, do not know why jm was not notify about
> this.
>
> As all of the failed status was triggered by a oom by the subtask can not
> create native thread when checkpointing, I also dumped the stack of the
> jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask
> 5/5) are still active after it throwed a oom and was called to cancel . I
> attached the jstack file in this em

Re: run python job with flink 1.7

2019-05-16 Thread Joshua Fan
when I look into the log file, it turned out that flink cannot get the plan
when create the plan file, full log message is below.

107 2019-05-17 12:24:56.950 [main] ERROR
org.apache.flink.python.api.PythonPlanBinder  - Failed to run plan.
108 java.lang.RuntimeException: Plan file caused an error. Check log-files
for details.
109 at
org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.preparePlanMode(PythonPlanStreamer.java:107)
110 at
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:178)
111 at
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:98)
112 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
113 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
114 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
115 at java.lang.reflect.Method.invoke(Method.java:498)
116 at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
117 at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
118 at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
119 at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
120 at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
121 at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
122 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
123 at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
124 at java.security.AccessController.doPrivileged(Native Method)
125 at javax.security.auth.Subject.doAs(Subject.java:422)
126 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
127 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
128 at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
129 2019-05-17 12:24:56.951 [main] INFO
org.apache.flink.runtime.rest.RestClient  - Shutting down rest endpoint.
130 2019-05-17 12:24:56.959 [main] INFO
org.apache.flink.runtime.rest.RestClient  - Rest endpoint shutdown complete.

I am not familiar with python. Thanks for your help.

On Fri, May 17, 2019 at 11:47 AM Joshua Fan  wrote:

> Hi all
>
> When I run the python example in flink 1.7, it always got a excepthon.
>
> The command is: ./bin/pyflink.sh ./examples/python/streaming/word_count.py
>
> The return message is:
> 2019-05-17 11:43:22,900 INFO  org.apache.hadoop.yarn.client.RMProxy
>  - Connecting to ResourceManager at
> data01.hj.shbt.qihoo.net/10.203.82.17:8832
> Starting execution of program
> Traceback (most recent call last):
>   File "/tmp/flink_plan_0a1aed4b-4155-4cfa-b2ba-8083a4a61f6e/plan.py",
> line 21, in 
> from org.apache.flink.api.common.functions import FlatMapFunction
> ImportError: No module named org.apache.flink.api.common.functions
> Failed to run plan: Plan file caused an error. Check log-files for details.
>
> The program didn't contain a Flink job. Perhaps you forgot to call
> execute() on the execution environment.
>
> Can not find any help in google.
> Appreciate your help very much.
>
> Sincerely
> Joshua
>


run python job with flink 1.7

2019-05-16 Thread Joshua Fan
Hi all

When I run the python example in flink 1.7, it always got a excepthon.

The command is: ./bin/pyflink.sh ./examples/python/streaming/word_count.py

The return message is:
2019-05-17 11:43:22,900 INFO  org.apache.hadoop.yarn.client.RMProxy
 - Connecting to ResourceManager at
data01.hj.shbt.qihoo.net/10.203.82.17:8832
Starting execution of program
Traceback (most recent call last):
  File "/tmp/flink_plan_0a1aed4b-4155-4cfa-b2ba-8083a4a61f6e/plan.py", line
21, in 
from org.apache.flink.api.common.functions import FlatMapFunction
ImportError: No module named org.apache.flink.api.common.functions
Failed to run plan: Plan file caused an error. Check log-files for details.

The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.

Can not find any help in google.
Appreciate your help very much.

Sincerely
Joshua


The submitting is hanging when register a hdfs file as registerCacheFile in 1.7 based on RestClusterClient

2019-02-18 Thread Joshua Fan
Hi, all

As the title says, the submitting is always hanging there when the cache
file is not reachable, actually because the RestClient uses a java.io.File
to get the cache file.

I use RestClusterClient to submit job in Flink 1.7.

Below is instructions shown in
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#distributed-cache
:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

Unfortunately, both the two examples can not be submitted, because
either hdfs:///path/to/your/file
or file:///path/to/exec/file is not reachable by the java.io.File, the http
post will not finish and the submitting is hanging.
When use env.registerCachedFile("/path/to/exec/file", "localExecFile",
true), the path is a regular local Path , the job can be submitted and the
cache file is available.

Is there some problems in the code or should I fire a jira?

Yours
Joshua


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi Hequn

Thanks. Now I know what you mean. To use tableEnv.registerTableSource
instead of using StreamTableDescriptor.registerTableSource. Yes, it is a
good solution.
If the StreamTableDescriptor itself can use a user-defined classloader, it
is better.
Thank you.

Yours sincerely
Joshua

On Wed, Jan 16, 2019 at 10:24 AM Joshua Fan  wrote:

> Hi Hequn
>
> Yes, the TableFactoryService has a proper method. As I
> use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor
> actually uses ConnectTableDescriptor which calls TableFactoryUtil to do
> service load, and TableFactoryUtil does not use a user defined classloader,
> so I can not use `TableFactoryService.find(StreamTableSourceFactory.class,
> streamTableDescriptor, classloader)` in StreamTableDescriptor directly.
>
> One solution for me is:
> 1.add method to TableFactoryUtil to use user defined classloader.
> 2.add method to ConnectTableDescriptor accordingly.
> 3.add method to StreamTableDescriptor accordingly.
>
> But I wonder if there is a current solution to register TableSource from
> StreamTableDescriptor using user defined classloader.
>
> Your sincerely
> Joshua
>
> On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng  wrote:
>
>> Hi Joshua,
>>
>> Could you use `TableFactoryService` directly to register TableSource? The
>> code looks like:
>>
>> final TableSource tableSource =
>>> TableFactoryService.find(StreamTableSourceFactory.class,
>>> streamTableDescriptor, classloader)
>>> .createStreamTableSource(propertiesMap);
>>> tableEnv.registerTableSource(name, tableSource);
>>
>>
>> Best, Hequn
>>
>> On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan 
>> wrote:
>>
>>> Hi
>>>
>>> As known, TableFactoryService has many methods to find a suitable
>>> service to load. Some of them use a user defined classloader, the others
>>> just uses the default classloader.
>>>
>>> Now I use ConnectTableDescriptor to registerTableSource in the
>>> environment, which uses TableFactoryUtil to load service, but
>>> TableFactoryUtil just use the default classloader, it is not enough in my
>>> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
>>> together in the lib directory.
>>>
>>> Is there a proper way to use ConnectTableDescriptor to
>>> registerTableSource at a user defined classloader?
>>>
>>> I know SQL Client has their now implementation to avoid
>>> use TableFactoryUtil, but I think TableFactoryUtil itself should also
>>> provide a method to use user defined classloader.
>>>
>>> Yours sincerely
>>> Joshhua
>>>
>>


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi Hequn

Yes, the TableFactoryService has a proper method. As I
use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor
actually uses ConnectTableDescriptor which calls TableFactoryUtil to do
service load, and TableFactoryUtil does not use a user defined classloader,
so I can not use `TableFactoryService.find(StreamTableSourceFactory.class,
streamTableDescriptor, classloader)` in StreamTableDescriptor directly.

One solution for me is:
1.add method to TableFactoryUtil to use user defined classloader.
2.add method to ConnectTableDescriptor accordingly.
3.add method to StreamTableDescriptor accordingly.

But I wonder if there is a current solution to register TableSource from
StreamTableDescriptor using user defined classloader.

Your sincerely
Joshua

On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng  wrote:

> Hi Joshua,
>
> Could you use `TableFactoryService` directly to register TableSource? The
> code looks like:
>
> final TableSource tableSource =
>> TableFactoryService.find(StreamTableSourceFactory.class,
>> streamTableDescriptor, classloader)
>> .createStreamTableSource(propertiesMap);
>> tableEnv.registerTableSource(name, tableSource);
>
>
> Best, Hequn
>
> On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan  wrote:
>
>> Hi
>>
>> As known, TableFactoryService has many methods to find a suitable service
>> to load. Some of them use a user defined classloader, the others just uses
>> the default classloader.
>>
>> Now I use ConnectTableDescriptor to registerTableSource in the
>> environment, which uses TableFactoryUtil to load service, but
>> TableFactoryUtil just use the default classloader, it is not enough in my
>> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
>> together in the lib directory.
>>
>> Is there a proper way to use ConnectTableDescriptor to
>> registerTableSource at a user defined classloader?
>>
>> I know SQL Client has their now implementation to avoid
>> use TableFactoryUtil, but I think TableFactoryUtil itself should also
>> provide a method to use user defined classloader.
>>
>> Yours sincerely
>> Joshhua
>>
>


There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi

As known, TableFactoryService has many methods to find a suitable service
to load. Some of them use a user defined classloader, the others just uses
the default classloader.

Now I use ConnectTableDescriptor to registerTableSource in the environment,
which uses TableFactoryUtil to load service, but TableFactoryUtil just use
the default classloader, it is not enough in my case. Because the user may
use kafka 0.8 or 0.9, the jars can not be put together in the lib
directory.

Is there a proper way to use ConnectTableDescriptor to registerTableSource
at a user defined classloader?

I know SQL Client has their now implementation to avoid
use TableFactoryUtil, but I think TableFactoryUtil itself should also
provide a method to use user defined classloader.

Yours sincerely
Joshhua


Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-15 Thread Joshua Fan
Hi Zhenghua

Yes, the topic is polluted somehow. After I create a new topic to consume,
It is OK now.

Yours sincerely
Joshua

On Tue, Jan 15, 2019 at 4:28 PM Zhenghua Gao  wrote:

> May be you're generating non-standard JSON record.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-11 Thread Joshua Fan
Hi Timo

Thank you for your advice. It is truely a typo. After I fix it, the same
exception remains.

But when I add the inAppendMode() to the StreamTableDescriptor, the
exception disappears, and it can find the proper kafka08factory.

And another exception turns out.
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('-' (code 45)): Expected space separating root-level
values
 at [Source: [B@69e1cfbe; line: 1, column: 6]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:94)

But actually, I produced the json data to the topic, why flink can
not deserialize it? It is weird.

Yours
Joshua

On Fri, Jan 11, 2019 at 11:02 PM Timo Walther  wrote:

> Hi Jashua,
>
> according to the property list, you passed "connector.version=0.10" so a
> Kafka 0.8 factory will not match.
>
> Are you sure you are compiling the right thing? There seems to be a
> mismatch between your screenshot and the exception.
>
> Regards,
> Timo
>
> Am 11.01.19 um 15:43 schrieb Joshua Fan:
>
> Hi,
>
> I want to test flink sql locally by consuming kafka data in flink 1.7, but
> it turns out an exception like below.
>
> Exception in thread "main"
>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
>>> a suitable table factory for
>>> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
>>
>> the classpath.
>>
>>
>>> Reason: No context matches.
>>
>>
>>> The following properties are requested:
>>
>> connector.properties.0.key=fetch.message.max.bytes
>>
>> connector.properties.0.value=10485760
>>
>> connector.properties.1.key=zookeeper.connect
>>
>> connector.properties.1.value=10.xxx.:2181/kafka
>>
>> connector.properties.2.key=group.id
>>
>> connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
>>
>> connector.properties.3.key=bootstrap.servers
>>
>> connector.properties.3.value=10.xxx:9092
>>
>> connector.property-version=1
>>
>> connector.startup-mode=latest-offset
>>
>> connector.topic=-flink-test
>>
>> connector.type=kafka
>>
>> connector.version=0.10
>>
>> format.derive-schema=true
>>
>> format.property-version=1
>>
>> format.type=json
>>
>> schema.0.name=rideId
>>
>> schema.0.type=VARCHAR
>>
>> schema.1.name=lon
>>
>> schema.1.type=VARCHAR
>>
>>
>>> The following factories have been considered:
>>
>> org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
>>
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>>
>> org.apache.flink.formats.json.JsonRowFormatFactory
>>
>>
>>> at
>>> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
>>
>> at
>>> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
>>
>> at
>>> org.apache.fl

NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-11 Thread Joshua Fan
Hi,

I want to test flink sql locally by consuming kafka data in flink 1.7, but
it turns out an exception like below.

Exception in thread "main"
>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
>> a suitable table factory for
>> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
>
> the classpath.
>
>
>> Reason: No context matches.
>
>
>> The following properties are requested:
>
> connector.properties.0.key=fetch.message.max.bytes
>
> connector.properties.0.value=10485760
>
> connector.properties.1.key=zookeeper.connect
>
> connector.properties.1.value=10.xxx.:2181/kafka
>
> connector.properties.2.key=group.id
>
> connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
>
> connector.properties.3.key=bootstrap.servers
>
> connector.properties.3.value=10.xxx:9092
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=-flink-test
>
> connector.type=kafka
>
> connector.version=0.10
>
> format.derive-schema=true
>
> format.property-version=1
>
> format.type=json
>
> schema.0.name=rideId
>
> schema.0.type=VARCHAR
>
> schema.1.name=lon
>
> schema.1.type=VARCHAR
>
>
>> The following factories have been considered:
>
> org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
>
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
>
>> at
>> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
>
> at
>> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
>
> at
>> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
>
> at
>> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
>
> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
>
> at TableSourceFinder.main(TableSourceFinder.java:40)
>
>
here is my code:

public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment stEnv =
TableEnvironment.getTableEnvironment(env);
Kafka kafka = new Kafka();
Properties properties = new Properties();
String zkString = "10.xxx:2181/kafka";
String brokerList = "10.xxx:9092";

properties.setProperty("fetch.message.max.bytes", "10485760");
properties.setProperty("group.id", UUID.randomUUID().toString());
properties.setProperty("zookeeper.connect", zkString);
properties.setProperty("bootstrap.servers", brokerList);
kafka.version("0.8").topic("flink-test").properties(properties);
kafka.startFromLatest();
stEnv.connect(kafka).withSchema(
new Schema()
.field("rideId", Types.STRING())
.field("lon", Types.STRING()))
.withFormat(new Json().deriveSchema())

.registerTableSource("test");

Table table = stEnv.sqlQuery("select rideId from test");
DataStream ds =
((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
toAppendStream(table,Types.STRING());
ds.print();
env.execute("KafkaSql");

}


And here is my pom.xml


org.apache.flink
flink-table_2.11
${flink.version}



org.apache.flink
flink-json
${flink.version}




org.apache.flink
flink-connector-kafka-0.8_2.11
${flink.version}



In my opinion, I have all the lib in pom, don't know why it would fail in
test locally.

Thank you for any hints.

Yours
Joshua


Re: Using port ranges to connect with the Flink Client

2019-01-06 Thread Joshua Fan
Hi Chesnay

Yes, RestClusterClient is used in our company when using flink 1.7. It can
do almost everything except to  get the ClusterOverview when I want to get
summary information on a session cluster. Finally, I manually trigger a
http get request to the cluster to do that. If RestClusterClient can
provide a similar interface will be good.

Yours
Joshua

On Fri, Jan 4, 2019 at 5:28 PM Gyula Fóra  wrote:

> Hi,
>
> Thanks Chesnay my problem was fixed it was related to enabling port ranges
> for the rest client it turned out.
>
> Gyula
>
> On Fri, 4 Jan 2019 at 10:26, Chesnay Schepler  wrote:
>
>> @Gyula: From what I can tell your custom client is still relying on
>> akka, and should be using the RestClusterClient instead.
>>
>> @Joshua: Are you by change using the ClusterClient directly? Unless
>> you're working with legacy clusters, for 1.5+ you should use the
>> RestClusterClient instead.
>>
>> On 03.01.2019 08:32, Joshua Fan wrote:
>> > Hi, Gyula
>> >
>> > I met a similar situation.
>> >
>> > We used flink 1.4 before, and everything is ok.
>> >
>> > Now, we upgrade to flink 1.7 and use non-legacy mode, there seems
>> > something not ok, it all refers to that it is impossible get the
>> > jobmanagerGateway at client side. When I create a cluster without a
>> > job, I describe the cluster, flink will throw the same exception as
>> > you pointed out. When I submit a job, I want to trigger a savepoint at
>> > client side, it will also throw the same exception.
>> >
>> > Don't know why in non-legacy mode,flink will not write back the leader
>> > info into zookeeper in the path of
>> > /flink/app_9_000/leader/0/job _manager_lock. This causes
>> > all the operations fail when using the jobmanagerGateway method in
>> > ClusterClient.
>> >
>> > Hope someone can explain how to do this in a non-legacy mode.
>> >
>> > Yours sincerely
>> > Joshua
>>
>>
>>


Re: no log exists in JM and TM when updated to Flink 1.7

2019-01-02 Thread Joshua Fan
Hi Till

I found the root cause why log-not-show when use logback, because flink
does not include the logback-*.jar in the lib folder.
After I put the logback jar file in lib, everything is ok now.
I think flink should put the logback jar files into the lib directory, not
just the log4j jar file, because the both log configuration file exist in
conf directory.

Yours sincerely
Joshua

On Wed, Jan 2, 2019 at 10:08 PM Till Rohrmann  wrote:

> Hi Joshua,
>
> could you check the content of the logback.xml. Maybe this file has
> changed between the versions.
>
> Cheers,
> Till
>
> On Wed, Dec 26, 2018 at 11:19 AM Joshua Fan 
> wrote:
>
>> Hi,
>>
>> It is very weird that there is no log file for JM and TM when run flink
>> job on yarn after updated flink to 1.7.on Flink 1.4.2, everything is OK. I
>> checked the log directory, there were jobmanager.error and jobmanager.out,
>> but without jobmanager.log, but the log message which should exist in
>> jobmanager.log now shows up in jobmanager.error. The taskmanager has the
>> same situation, no taskmanager.log but information exists in
>> taskmanager.error.
>>
>> below is the container lauch shell, and it seems ok.
>>
>> yarn 21784  0.0  0.0 109896  1480 ?Ss   15:13   0:00
>> /bin/bash -c /home/yarn/software/java8/bin/java -Xmx424m
>> -Dlog.file=/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
>> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.out
>> 2>
>> /data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/
>>
>> any hints?Thanks a lot.
>>
>> Yours
>> Joshua
>>
>


Re: Using port ranges to connect with the Flink Client

2019-01-02 Thread Joshua Fan
Hi, Gyula

I met a similar situation.

We used flink 1.4 before, and everything is ok.

Now, we upgrade to flink 1.7 and use non-legacy mode, there seems something
not ok, it all refers to that it is impossible get the jobmanagerGateway at
client side. When I create a cluster without a job, I describe the cluster,
flink will throw the same exception as you pointed out. When I submit a
job, I want to trigger a savepoint at client side, it will also throw the
same exception.

Don't know why in non-legacy mode,flink will not write back the leader info
into zookeeper in the path of  /flink/app_9_000/leader/0/job
_manager_lock. This causes all the operations fail when using the
jobmanagerGateway method in ClusterClient.

Hope someone can explain how to do this in a non-legacy mode.

Yours sincerely
Joshua


Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes

2019-01-02 Thread Joshua Fan
I found the root cause why log-not-show when use logback, because flink
does not include the logback-*.jar in the lib folder.
After I put the logback jar file in lib, everything is ok now.

On Fri, Dec 28, 2018 at 10:41 PM Chesnay Schepler 
wrote:

> @Steven: Do you happen do know whether a JIRA exists for this issue?
>
> @Joshua: Does this also happen if you use log4j?
>
> On 26.12.2018 11:33, Joshua Fan wrote:
>
> wow, I met similar situation using flink 1.7 on yarn.
>
> there was no jobmanager.log on the node but jobmanager.out and
> jobmanager.error, and jobmanager.error contains the log message. so , there
> was nothing in the webUI.
>
> I do not know why this happened. by the way, I used logback to do log
> staff.
>
> On Thu, Dec 20, 2018 at 12:50 AM Steven Nelson 
> wrote:
>
>> There is a known issue for this I believe. The problem is that the
>> containerized versions of Flink output logs to STDOUT instead of files
>> inside the node. If you pull use docker logs on the container you can see
>> what you’re looking for. I use the Kube dashboard to view the logs
>> centrally.
>>
>> Sent from my iPhone
>>
>> > On Dec 19, 2018, at 9:40 AM, William Saar  wrote:
>> >
>> >
>> > I'm running Flink 1.7 in ECS, is this a known issue or should I create
>> a jira?
>> >
>> > The web console doesn't show anything when trying to list logs or
>> stdout for task managers and the job manager log have stack traces for the
>> errors
>> > 2018-12-19 15:35:53,498 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
>> - Failed to transfer file from TaskExecutor
>> d7fd266047d5acfaddeb1156bdb23ff3.
>> > java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: The file STDOUT is not available on
>> the TaskExecutor.
>> >
>> > 2018-12-19 15:36:02,538 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
>> - Failed to transfer file from TaskExecutor
>> d7fd266047d5acfaddeb1156bdb23ff3.
>> > java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: The file LOG is not available on the
>> TaskExecutor.
>> >
>>
>
>


Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes

2018-12-26 Thread Joshua Fan
wow, I met similar situation using flink 1.7 on yarn.

there was no jobmanager.log on the node but jobmanager.out and
jobmanager.error, and jobmanager.error contains the log message. so , there
was nothing in the webUI.

I do not know why this happened. by the way, I used logback to do log staff.

On Thu, Dec 20, 2018 at 12:50 AM Steven Nelson 
wrote:

> There is a known issue for this I believe. The problem is that the
> containerized versions of Flink output logs to STDOUT instead of files
> inside the node. If you pull use docker logs on the container you can see
> what you’re looking for. I use the Kube dashboard to view the logs
> centrally.
>
> Sent from my iPhone
>
> > On Dec 19, 2018, at 9:40 AM, William Saar  wrote:
> >
> >
> > I'm running Flink 1.7 in ECS, is this a known issue or should I create a
> jira?
> >
> > The web console doesn't show anything when trying to list logs or stdout
> for task managers and the job manager log have stack traces for the errors
> > 2018-12-19 15:35:53,498 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> - Failed to transfer file from TaskExecutor
> d7fd266047d5acfaddeb1156bdb23ff3.
> > java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file STDOUT is not available on
> the TaskExecutor.
> >
> > 2018-12-19 15:36:02,538 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
> - Failed to transfer file from TaskExecutor
> d7fd266047d5acfaddeb1156bdb23ff3.
> > java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file LOG is not available on the
> TaskExecutor.
> >
>


no log exists in JM and TM when updated to Flink 1.7

2018-12-26 Thread Joshua Fan
Hi,

It is very weird that there is no log file for JM and TM when run flink job
on yarn after updated flink to 1.7.on Flink 1.4.2, everything is OK. I
checked the log directory, there were jobmanager.error and jobmanager.out,
but without jobmanager.log, but the log message which should exist in
jobmanager.log now shows up in jobmanager.error. The taskmanager has the
same situation, no taskmanager.log but information exists in
taskmanager.error.

below is the container lauch shell, and it seems ok.

yarn 21784  0.0  0.0 109896  1480 ?Ss   15:13   0:00 /bin/bash
-c /home/yarn/software/java8/bin/java -Xmx424m
-Dlog.file=/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.log
-Dlogback.configurationFile=file:logback.xml
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/jobmanager.out
2>
/data03/yarn/userlogs/application_1543893582405_1760130/container_e124_1543893582405_1760130_02_01/

any hints?Thanks a lot.

Yours
Joshua


Weird behavior in actorSystem shutdown in akka

2018-11-19 Thread Joshua Fan
Hi, Till and users,

There is a weird behavior in actorSystem shutdown in akka of our flink
platform.
We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing
agent to submit flink job to yarn which is based on YarnClient. User can
connect to the agent to submit job and disconnect, but the agent is always
there. So, each time the user submit a job there would be a ActorSystem
created, after the job submitted in detached mode successfully, the
ActorSystem would be shutdown.
The weird thing is that there always an akka error message turn out in jm
log after 2 days( 2 day is the default value in akka of
quarantine-after-silence), like below.

2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR
akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 -
Association to [akka.tcp://fl...@client01v.xxx:35767] with UID
[-1757115446] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In the above, the client01v*** is the host node where runs the agent, and
the above error turns out randomly. We trigger a savepoint in the agent
every half hour, it means the actorSystem will be created and shutdown
accordingly. But only 1 of 50 chance  the shutdown will raise a error like
above.

I think maybe it refer to the akka system. I checked the akka code, found
some clues as below.
for those there is no error raised in two days, the log in jm like this:

2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
akka.remote.transport.ProtocolStateActor
flink-akka.remote.default-remote-dispatcher-23 - Association between local
[tcp://flink@:29448] and remote [tcp://flink@:56906] was
disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
akka.remote.transport.ProtocolStateActor
flink-akka.remote.default-remote-dispatcher-23 - Association between local
[tcp://flink@:29448] and remote [tcp://flink@:56906] was
disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
Remote system with address [akka.tcp://flink@:41769] has shut down.
Address is now gated for 5000 ms, all messages to this address will be
delivered to dead letters.
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
Remote system with address [akka.tcp://flink@:41769] has shut down.
Address is now gated for 5000 ms, all messages to this address will be
delivered to dead letters.

It seems the remote actor receives the shutdown proposal, the akka message
may flow like below:
1.The agent shut down the actorSystem
2.The EndpointReader in jm  receives an AssociationHandle. Shutdown  and
EndpointReader just throws it as a ShutDownAssociation, and the
EndpointWriter will publishAndthrow the ShutDownAssociation again.
2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem
reported by the EndpointWriter, it also throw it out.
3.when the EndpointManager in jm gets the ShutDownAssociation exception,
the EndpointManager would stop the actor.

but for the one which will raised the silent error , the log in jm like
this, seems the remote actor did not receives the shutdown proposal:

2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG
akka.remote.transport.ProtocolStateActor
flink-akka.remote.default-remote-dispatcher-14 - Association between local
[tcp://flink@:29448] and remote [tcp://flink@:45103] was
disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG
akka.remote.transport.ProtocolStateActor
flink-akka.remote.default-remote-dispatcher-14 - Association between local
[tcp://flink@:29448] and remote [tcp://flink@:45103] was
disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN

Re: Get nothing from TaskManager in UI

2018-10-23 Thread Joshua Fan
Hi Vino,

the version is 1.4.2.

Yours
Joshua

On Tue, Oct 23, 2018 at 7:26 PM vino yang  wrote:

> Hi Joshua,
>
> Which version of Flink are you using?
>
> Thanks, vino.
>
> Joshua Fan  于2018年10月23日周二 下午5:58写道:
>
>> Hi All
>>
>> came into new situations, that the UI can show metric data but the data
>> remains the same all the time after days. So, there are two cases, one is
>> no data in UI at all, another is dead data in UI all the time.
>>
>> when dig into the taskmanager.log, taskmanager.error, taskmanager.out,
>> there is nothing unnormal found.
>>
>> anyone can give some hints?
>>
>> Yours sincerely
>> Joshua
>>
>> On Wed, Oct 17, 2018 at 5:05 PM Joshua Fan 
>> wrote:
>>
>>> Hi,all
>>>
>>> Frequently, for some cluster, there is  no data from Task Manager in UI,
>>> as the picture shows below.
>>> [image: tm-hang.png]
>>> but the cluster and the job is running well, just no metrics can be got.
>>> anything can do to improve this?
>>>
>>> Thanks for your assistance.
>>>
>>> Your sincerely
>>> Joshua
>>>
>>


Re: Get nothing from TaskManager in UI

2018-10-23 Thread Joshua Fan
Hi All

came into new situations, that the UI can show metric data but the data
remains the same all the time after days. So, there are two cases, one is
no data in UI at all, another is dead data in UI all the time.

when dig into the taskmanager.log, taskmanager.error, taskmanager.out,
there is nothing unnormal found.

anyone can give some hints?

Yours sincerely
Joshua

On Wed, Oct 17, 2018 at 5:05 PM Joshua Fan  wrote:

> Hi,all
>
> Frequently, for some cluster, there is  no data from Task Manager in UI,
> as the picture shows below.
> [image: tm-hang.png]
> but the cluster and the job is running well, just no metrics can be got.
> anything can do to improve this?
>
> Thanks for your assistance.
>
> Your sincerely
> Joshua
>


Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Joshua Fan
Hi Niels,

Probably not, an operator begins to do checkpoint until it gets all the
barriers from all the upstream sources, if one source can not send a
barrier, the downstream operator can not do checkpoint, FYI.

Yours sincerely
Joshua

On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam  wrote:

> Hi All,
>
> I am debugging an issue where the periodic checkpointing has halted. I
> noticed that one of the sources of my job has completed (finished). The
> other sources and operators would however still be able to produce output.
>
> Does anyone know if Flink's periodic checkpoints are supposed to continue
> when one or more sources of a job are in the "FINISHED" state?
>
> Cheers,
> Niels
>
>


Get nothing from TaskManager in UI

2018-10-17 Thread Joshua Fan
Hi,all

Frequently, for some cluster, there is  no data from Task Manager in UI, as
the picture shows below.
[image: tm-hang.png]
but the cluster and the job is running well, just no metrics can be got.
anything can do to improve this?

Thanks for your assistance.

Your sincerely
Joshua


How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Joshua Fan
Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
, "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in
flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar'
(missing)
...
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I
dig into the source code, from CliFrontend to PackagedProgram to
ClusterClient to JobGraph. It seems like the dependency jars is put in
classpath and userCodeClassLoader in PackagedProgram, but never upload to
the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua