Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Thank you Vino. 

I put the message in a  tag, and I don't know why it was not shown in the
email thread. I paste the error message below in this email.

Anyway, it seems that was an issue with enabling checkpointing. Now I am
able to get it turned on properly, and my job is getting restored
automatically.
I am trying to test my scenarios now. Found some issues, and I think it
would be better to ask in a separate thread.

Thanks and regards,
Averell

=
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 457d8f370ef8a50bb462946e1f12b80e)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
..
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id container_1535279282999_0032_01_13 timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread vino yang
Hi Averell,

This problem is caused by a heartbeat timeout between JM and TM. You can
locate it by:
1) Check the network status of the node at the time, such as whether the
connection with other systems is equally problematic;
2) Check the tm log to see if there are more specific reasons;
3) View the load condition of the node that generated the timeout period;
4) Confirm whether there is a problem such as Full GC causing the JVM
process to be stuck at the time;

Also, I don't know if you are using the default timeout, and if so, you can
increase it appropriately.

Thanks, vino.

Averell  于2018年8月27日周一 下午3:00写道:

> Thank you Vino.
>
> I put the message in a  tag, and I don't know why it was not shown in the
> email thread. I paste the error message below in this email.
>
> Anyway, it seems that was an issue with enabling checkpointing. Now I am
> able to get it turned on properly, and my job is getting restored
> automatically.
> I am trying to test my scenarios now. Found some issues, and I think it
> would be better to ask in a separate thread.
>
> Thanks and regards,
> Averell
>
> =
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 457d8f370ef8a50bb462946e1f12b80e)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> at
>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
> ..
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
> with id container_1535279282999_0032_01_13 timed out.
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
> at
>
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: would you join a Slack workspace for Flink?

2018-08-27 Thread Fabian Hueske
Hi,

I don't think that recommending Gists is a good idea.
Sure, well formatted and highlighted code is nice and much better than
posting screenshots but Gists can be deleted.
Deleting a Gist would make an archived thread useless.
I would definitely support instructions on how to add code to a mail.

Regarding the overall topic of some kind of chat room.
I would not participate in that.
The Flink user mailing list is one of the most active user lists of the
ASF.
It's already quite challenging to keep up with the loads of mails.
Adding a synchronous channel to that would make things worse for me.
(That's probably also one of the reasons why the IRC channel is abandoned.)
I can of course only speak for myself, but I would imaging that many
members of the community who are helping out on the mailing list feel the
same.

Best,
Fabian

Am So., 26. Aug. 2018 um 15:17 Uhr schrieb Nicos Maris <
nicos.ma...@gmail.com>:

> Hi Dominik,
>
> I was writing about gitter just now :)
>
> If searchability is an issue, then indeed we could consider the free plan
> of gitter: https://billing.gitter.im
>
>
> In any case, we should instruct users who paste code snippets at the
> mailing list to use http://gist.github.com
>
>
> On Sun, Aug 26, 2018 at 4:13 PM Dominik Wosiński  wrote:
>
>>
>>
>> -- Forwarded message -
>> From: Dominik Wosiński 
>> Date: niedz., 26 sie 2018 o 15:12
>> Subject: ODP: would you join a Slack workspace for Flink?
>> To: Hequn Cheng 
>>
>>
>> Hey,
>> I have been facing this issue for multiple open source projects and
>> discussions. Slack in my opinion has two main issues :
>>
>>  - the already mentioned issue with searching, through search
>> engine
>>
>>  - Slack is still commercial application.
>>
>> The second issue is quite important, because for free version Slack gives
>> 10k messages of history. I personally think that for Flink this would to
>> loss all messages that are older than a week possibly. This is the big
>> issue as it woul most certainly lead to asking the same questions over and
>> over again. I’ve seen really big slack groups for some big projects where
>> the history would last like 3-4 days and this is pure nightmare.
>>
>> The better solution would be to use gitter than Slack IMHO if there is
>> need for such way of communication.
>>
>> Best Regards,
>> Dominik.
>>
>>
>>
>> Wysłane z aplikacji Poczta
>>  dla Windows 10
>>
>>
>>
>> *Od: *Hequn Cheng 
>> *Wysłano: *niedziela, 26 sierpnia 2018 14:37
>> *Do: *Nicos Maris 
>> *DW: *ches...@apache.org; user 
>> *Temat: *Re: would you join a Slack workspace for Flink?
>>
>>
>>
>> Hi Nicos,
>>
>>
>>
>> Thanks for bring up this discussion. :-)
>>
>> Slack is a good way to communicate, but it seems not very fit for the
>> open source field. The messages on Slack are mixed up and can not be
>> searched through search engine.
>>
>>
>>
>> Best, Hequn
>>
>>
>>
>> On Sun, Aug 26, 2018 at 7:22 PM Nicos Maris 
>> wrote:
>>
>> Chesnay can you take a look at the following PR?
>>
>>
>>
>> https://github.com/apache/flink-web/pull/120
>>
>>
>>
>> On Sun, Aug 26, 2018 at 1:09 PM Chesnay Schepler 
>> wrote:
>>
>> There have been previous discussions around using slack and they were
>> rejected.
>>
>> Personally I would just remove the IRC channel; I'm not aware of any
>> committer actually spending time there.
>>
>> On 25.08.2018 17:07, Nicos Maris wrote:
>>
>>
>>
>> Hi all,
>>
>>
>>
>>
>>
>> This mailing list is for user support and questions. If you would also
>> use slack for user support and questions, then please vote at the following
>> ticket. If you don't have an account at that jira, you can reply to this
>> email with a "+1".
>>
>>
>>
>>
>>
>> [FLINK-10217 ] use
>> Slack for user support and questions
>>  Current status
>>
>> For user support and questions, users are instructed to subscribe to
>> user@flink.apache.org but there are users like me who enjoy using also a
>> chat channel. However, the instructions to do so are not clear and the IRC
>> activity is low and it is definitely not indicative of the project's
>> activity
>> 
>> .
>>
>> The website  mentions that "If
>> you want to talk with the Flink committers and users in a chat, there is an 
>> IRC
>> channel ."
>> Option 1: Use Slack
>>
>> An example of an Apache project that is using Slack
>>  is:
>> http://mesos.apache.org/community
>>
>> I can assist on setting it up if at least one expert joins from the very
>> beginning.
>> Option 2: Keep using IRC and document it
>>
>> Add the missing section
>> 

Re: [DISCUSS] Remove the slides under "Community & Project Info"

2018-08-27 Thread Fabian Hueske
I agree to remove the slides section.
A lot of the content is out-dated and hence not only useless but might
sometimes even cause confusion.

Best,
Fabian



Am Mo., 27. Aug. 2018 um 08:29 Uhr schrieb Renjie Liu <
liurenjie2...@gmail.com>:

> Hi, Stephan:
> Can we put project wiki in some place? I think it's a great place to find
> flip lists
>
> On Mon, Aug 27, 2018 at 9:56 AM vino yang  wrote:
>
>> +1
>> The reason is the same as Hequn, because we have given a link to
>> SlideShare under the "Flink Forward" section.
>>
>> Thanks, vino.
>>
>> Hequn Cheng  于2018年8月27日周一 上午9:31写道:
>>
>>> Hi Stephan,
>>>
>>> Thanks for bringing up this discussion.
>>> I think we can just remove it, because slides have already be provided
>>> from the `Flink Forward` section in this page. Adding another slides
>>> section not only brings redundancy but also introduces problems you listed
>>> above.
>>>
>>> Best, Hequn
>>>
>>> On Sun, Aug 26, 2018 at 11:13 PM Stephan Ewen  wrote:
>>>
 Hi all!

 In the past, we collected slide sets under the "Community & Project
 Info"
 side.

 I would like to see what the community thinks about removing them. There
 are currently several issues:

   - The list is not well maintained. Tthere are for example no 2018
 slides
 at all.
   - Many slide sets are outdated. For example 2014 Flink internals are
 not
 really relevant any more.
   - When searching for certain terms, the Youtube videos and slideshare
 uploads appear first anyways
   - The list may create a wrong impression about having definitive
 resources.

 Best,
 Stephan

>>> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: would you join a Slack workspace for Flink?

2018-08-27 Thread Chesnay Schepler
I fully agree with Fabian, we'd just be replacing an empty IRC with an 
empty gitter.


On 27.08.2018 09:50, Fabian Hueske wrote:

Hi,

I don't think that recommending Gists is a good idea.
Sure, well formatted and highlighted code is nice and much better than 
posting screenshots but Gists can be deleted.

Deleting a Gist would make an archived thread useless.
I would definitely support instructions on how to add code to a mail.

Regarding the overall topic of some kind of chat room.
I would not participate in that.
The Flink user mailing list is one of the most active user lists of 
the ASF.

It's already quite challenging to keep up with the loads of mails.
Adding a synchronous channel to that would make things worse for me. 
(That's probably also one of the reasons why the IRC channel is 
abandoned.)
I can of course only speak for myself, but I would imaging that many 
members of the community who are helping out on the mailing list feel 
the same.


Best,
Fabian

Am So., 26. Aug. 2018 um 15:17 Uhr schrieb Nicos Maris 
mailto:nicos.ma...@gmail.com>>:


Hi Dominik,

I was writing about gitter just now :)

If searchability is an issue, then indeed we could consider the
free plan of gitter: https://billing.gitter.im



In any case, we should instruct users who paste code snippets at
the mailing list to use http://gist.github.com


On Sun, Aug 26, 2018 at 4:13 PM Dominik Wosiński mailto:wos...@gmail.com>> wrote:



-- Forwarded message -
From: *Dominik Wosiński* mailto:wos...@gmail.com>>
Date: niedz., 26 sie 2018 o 15:12
Subject: ODP: would you join a Slack workspace for Flink?
To: Hequn Cheng mailto:chenghe...@gmail.com>>


Hey,
I have been facing this issue for multiple open source
projects and discussions. Slack in my opinion has two main
issues :

 - the already mentioned issue with searching,
through search engine

 - Slack is still commercial application.

The second issue is quite important, because for free version
Slack gives 10k messages of history. I personally think that
for Flink this would to loss all messages that are older than
a week possibly. This is the big issue as it woul most
certainly lead to asking the same questions over and over
again. I’ve seen really big slack groups for some big projects
where the history would last like 3-4 days and this is pure
nightmare.

The better solution would be to use gitter than Slack IMHO if
there is need for such way of communication.

Best Regards,
Dominik.

Wysłane z aplikacji Poczta
 dla Windows 10

*Od: *Hequn Cheng 
*Wysłano: *niedziela, 26 sierpnia 2018 14:37
*Do: *Nicos Maris 
*DW: *ches...@apache.org ; user

*Temat: *Re: would you join a Slack workspace for Flink?

Hi Nicos,

Thanks for bring up this discussion. :-)

Slack is a good way to communicate, but it seems not very fit
for the open source field. The messages on Slack are mixed up
and can not be searched through search engine.

Best, Hequn

On Sun, Aug 26, 2018 at 7:22 PM Nicos Maris
mailto:nicos.ma...@gmail.com>> wrote:

Chesnay can you take a look at the following PR?

https://github.com/apache/flink-web/pull/120

On Sun, Aug 26, 2018 at 1:09 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

There have been previous discussions around using
slack and they were rejected.

Personally I would just remove the IRC channel; I'm
not aware of any committer actually spending time there.

On 25.08.2018 17:07, Nicos Maris wrote:


Hi all,

This mailing list is for user support and
questions. If you would also use slack for user
support and questions, then please vote at the
following ticket. If you don't have an account at
that jira, you can reply to this email with a "+1".

[FLINK-10217
] use
Slack for user support and questions


Current status

For user support and questions, users are
instructed to subscribe to user@flink.apache.org
 but there are users
like me who enjoy using also a chat channel.
However, t

Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Hi Vino,

Could you please tell where I should find the JM and TM logs? I'm running on
an AWS EMR using yarn.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread vino yang
Hi Averell,

I have not used aws products, but if it is similar to YARN, or if you have
visited YARN's web ui.
Then you look at the YARN ApplicationMaster log to view the JM log, and the
container log is the tm log.

Thanks, vino.

Averell  于2018年8月27日周一 下午4:09写道:

> Hi Vino,
>
> Could you please tell where I should find the JM and TM logs? I'm running
> on
> an AWS EMR using yarn.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Dealing with Not Serializable classes in Java

2018-08-27 Thread Chesnay Schepler

The null check in the method is the general-purpose way of solving it.
If the ObjectMapper is thread-safe you could also initialize it as a 
static field.


On 26.08.2018 17:58, Dominik Wosiński wrote:

Hey,

I was wondering how do You normally deal with fields that contain 
references that are not serializable. Say, we have a custom 
serialization schema in Java that needs to serialize 
/LocalDateTime/ field with /ObjectMapper./  This requires registering 
specific module for /ObjectMapper/ and this makes it not serializable 
(module contains some references to classes that are not serializable).
Now, if You would initialize /ObjectMapper /directly in the field this 
will cause an exception when deploying the job.


Normally I would do :
@Override public byte[]serialize(Backup backupMessage) {
 if(objectMapper ==null) {
 objectMapper =new ObjectMapper().registerModule(new JavaTimeModule()); 
}
...
}
But I was wondering whether do You have any prettier option of doing 
this?


Thanks,
Dominik.





Re: Dealing with Not Serializable classes in Java

2018-08-27 Thread Dominik Wosiński
Hey Paul,
Yeah that is possible, but I was asking in terms of serialization schema.
So I would really want to avoid RichFunction :)

Best Regards,
Dominik.

pon., 27 sie 2018 o 10:23 Chesnay Schepler  napisał(a):

> The null check in the method is the general-purpose way of solving it.
> If the ObjectMapper is thread-safe you could also initialize it as a
> static field.
>
> On 26.08.2018 17:58, Dominik Wosiński wrote:
>
> Hey,
>
> I was wondering how do You normally deal with fields that contain
> references that are not serializable. Say, we have a custom serialization
> schema in Java that needs to serialize *LocalDateTime* field with
> *ObjectMapper.*  This requires registering specific module for
> *ObjectMapper* and this makes it not serializable (module contains some
> references to classes that are not serializable).
> Now, if You would initialize *ObjectMapper *directly in the field this
> will cause an exception when deploying the job.
>
> Normally I would do :
>
> @Overridepublic byte[] serialize(Backup backupMessage) {
> if(objectMapper == null) {
> objectMapper = new ObjectMapper().registerModule(new 
> JavaTimeModule());}
> ...
> }
>
> But I was wondering whether do You have any prettier option of doing this?
>
> Thanks,
> Dominik.
>
>
>


Re: Dealing with Not Serializable classes in Java

2018-08-27 Thread Chesnay Schepler

You don't need RichFunctions for that, you should be able to just do:

private static final ObjectMapper objectMapper =
new ObjectMapper().registerModule(new JavaTimeModule());

On 27.08.2018 10:28, Dominik Wosiński wrote:

Hey Paul,
Yeah that is possible, but I was asking in terms of serialization 
schema. So I would really want to avoid RichFunction :)


Best Regards,
Dominik.

pon., 27 sie 2018 o 10:23 Chesnay Schepler > napisał(a):


The null check in the method is the general-purpose way of solving it.
If the ObjectMapper is thread-safe you could also initialize it as
a static field.

On 26.08.2018 17:58, Dominik Wosiński wrote:

Hey,

I was wondering how do You normally deal with fields that contain
references that are not serializable. Say, we have a custom
serialization schema in Java that needs to serialize
/LocalDateTime/ field with /ObjectMapper./ This requires
registering specific module for /ObjectMapper/ and this makes it
not serializable (module contains some references to classes that
are not serializable).
Now, if You would initialize /ObjectMapper /directly in the field
this will cause an exception when deploying the job.

Normally I would do :
@Override public byte[]serialize(Backup backupMessage) {
 if(objectMapper ==null) {
 objectMapper =new ObjectMapper().registerModule(new 
JavaTimeModule()); }
...
}
But I was wondering whether do You have any prettier option of
doing this?

Thanks,
Dominik.







Re: Small-files source - partitioning based on prefix of file

2018-08-27 Thread Averell
Hello Fabian, and all,

Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?

I'm still with my 20,000 small files issue, when I have all those 2
files appear to the ContinuousFileMonitorfingFunction at the same time.
It is taking only a few seconds to list all those files, but it is expected
to take about 5 minutes have those 20K files processed till my sink.
Due to some resources limitation issue, my job fails after about 3 minutes.
And what is happening after that is the job crashes, gets restored, tries to
process all 20K files from file 1 again, and ultimately fails again after 3
minutes,... It goes into an indefinite loop.

I think that this is the expected behaviour, as my current checkpoint config
is to checkpoint every 10s, and it took only a second or two for the listing
of those 20K files. Am I correct here? And do we have a solution for this?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dealing with Not Serializable classes in Java

2018-08-27 Thread Dominik Wosiński
Hey ;)
I have received one response that was sent directly to my email and not to
user group :

> Hi Dominik,
>
> I think you can put the unserializable fields into RichFunctions and
> initiate them in the `open` method, so the the fields won’t need to be
> serialized with the tasks.
>
> Best,
> Paul Lam
>

And my response about RichFunctions was meant for Paul :)

Pon., 27.08.2018, 10:38 użytkownik Chesnay Schepler 
napisał:

> You don't need RichFunctions for that, you should be able to just do:
>
> private static final ObjectMapper objectMapper =  new 
> ObjectMapper().registerModule(new JavaTimeModule());
>
> On 27.08.2018 10:28, Dominik Wosiński wrote:
>
> Hey Paul,
> Yeah that is possible, but I was asking in terms of serialization schema.
> So I would really want to avoid RichFunction :)
>
> Best Regards,
> Dominik.
>
> pon., 27 sie 2018 o 10:23 Chesnay Schepler 
> napisał(a):
>
>> The null check in the method is the general-purpose way of solving it.
>> If the ObjectMapper is thread-safe you could also initialize it as a
>> static field.
>>
>> On 26.08.2018 17:58, Dominik Wosiński wrote:
>>
>> Hey,
>>
>> I was wondering how do You normally deal with fields that contain
>> references that are not serializable. Say, we have a custom serialization
>> schema in Java that needs to serialize *LocalDateTime* field with
>> *ObjectMapper.*  This requires registering specific module for
>> *ObjectMapper* and this makes it not serializable (module contains some
>> references to classes that are not serializable).
>> Now, if You would initialize *ObjectMapper *directly in the field this
>> will cause an exception when deploying the job.
>>
>> Normally I would do :
>>
>> @Overridepublic byte[] serialize(Backup backupMessage) {
>> if(objectMapper == null) {
>> objectMapper = new ObjectMapper().registerModule(new 
>> JavaTimeModule());}
>> ...
>> }
>>
>> But I was wondering whether do You have any prettier option of doing
>> this?
>>
>> Thanks,
>> Dominik.
>>
>>
>>
>


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-27 Thread Averell
Thank you, Vino.
I found it, http://:8088/ 

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Data loss when restoring from savepoint

2018-08-27 Thread Andrey Zagrebin
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the 
next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink 
queries s3 as a file system 
to list already written file parts (batches) and determine index of the next 
part to start. Due to eventual consistency of checking file existence in s3 
[1], the BucketingSink can rewrite the previously written part and basically 
loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its 
own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also 
check whether the size of missing events is around the batch size of 
BucketingSink or not. You also wrote that the timestamps of lost event are 
'probably' around the time of the savepoint, if it is not yet for sure I would 
also check it.

Have you already checked the log files of job manager and task managers for the 
job running before and after the restore from the check point? Is everything 
successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in 
DistinctFunction.reduce if possible for production data and check whether the 
missed events are eventually processed before or after the savepoint. The 
following log message indicates a border between the events that should be 
included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey

[1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html

> On 24 Aug 2018, at 20:41, Juho Autio  wrote:
> 
> Hi,
> 
> Using StreamingFileSink is not a convenient option for production use for us 
> as it doesn't support s3*. I could use StreamingFileSink just to verify, but 
> I don't see much point in doing so. Please consider my previous comment:
> 
> > I realized that BucketingSink must not play any role in this problem. This 
> > is because only when the 24-hour window triggers, BucketingSink gets a 
> > burst of input. Around the state restoring point (middle of the day) it 
> > doesn't get any input, so it can't lose anything either (right?).
> 
> I could also use a kafka sink instead, but I can't imagine how there could be 
> any difference. It's very real that the sink doesn't get any input for a long 
> time until the 24-hour window closes, and then it quickly writes out 
> everything because it's not that much data eventually for the distinct values.
> 
> Any ideas for debugging what's happening around the savepoint & restoration 
> time?
> 
> *) I actually implemented StreamingFileSink as an alternative sink. This was 
> before I came to realize that most likely the sink component has nothing to 
> do with the data loss problem. I tried it with s3n:// path just to see an 
> exception being thrown. In the source code I indeed then found an explicit 
> check for the target path scheme to be "hdfs://". 
> 
> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin  > wrote:
> Ok, I think before further debugging the window reduced state, 
> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 
> instead of the previous 'BucketingSink’?
> 
> Cheers,
> Andrey
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  
> 
> 
>> On 24 Aug 2018, at 18:03, Juho Autio > > wrote:
>> 
>> Yes, sorry for my confusing comment. I just meant that it seems like there's 
>> a bug somewhere now that the output is missing some data.
>> 
>> > I would wait and check the actual output in s3 because it is the main 
>> > result of the job
>> 
>> Yes, and that's what I have already done. There seems to be always some data 
>> loss with the production data volumes, if the job has been restarted on that 
>> day.
>> 
>> Would you have any suggestions for how to debug this further?
>> 
>> Many thanks for stepping in.
>> 
>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin > > wrote:
>> Hi Juho,
>> 
>> So it is a per key deduplication job.
>> 
>> Yes, I would wait and check the actual output in s3 because it is the main 
>> result of the job and
>> 
>> > The late data around the time of taking savepoint might be not included 
>> > into the savepoint but it should be behind the snapshotted offset in Kafka.
>> 
>> is not a bug, it is a possible behaviour.
>> 
>> The savepoint is a snapshot of the data in transient which is already 
>> consumed from Kafka.
>> Basically the full contents of the window result is split between the 
>> savepoint and what can come aft

Re: Question about QueryableState

2018-08-27 Thread Pierre Zemb
Hi!
Just created the JIRA (https://issues.apache.org/jira/browse/FLINK-10225).

Thanks for your reply,
Pierre

Le jeu. 23 août 2018 à 14:31, Kostas Kloudas 
a écrit :

> Hi Pierre,
>
> You are right that this should not happen.
> It seems like a bug.
> Could you open a JIRA and post it here?
>
> Thanks,
> Kostas
>
>
> On Aug 21, 2018, at 9:35 PM, Pierre Zemb 
> wrote:
>
> Hi!
>
> I’ve started to deploy a small Flink cluster (4tm and 1jm for now on
> 1.6.0), and deployed a small job on it. Because of the current load, job is
> completely handled by a single tm. I’ve created a small proxy that is using
> QueryableStateClient
> 
> to access the current state. It is working nicely, except under certain
> circumstances. It seems to me that I can only access the state through a
> node that is holding a part of the job. Here’s an example:
>
>- job on tm1. Pointing QueryableStateClient to tm1. State accessible
>- job still on tm1. Pointing QueryableStateClient to tm2 (for
>example). State inaccessible
>- killing tm1, job is now on tm2. State accessible
>- job still on tm2. Pointing QueryableStateClient to tm3. State
>inaccessible
>- adding some parallelism to spread job on tm1 and tm2. Pointing
>QueryableStateClient to either tm1 and tm2 is working
>- job still on tm1 and tm2. Pointing QueryableStateClient to tm3.
>State inaccessible
>
> When the state is inaccessible, I can see this (generated here
> 
> ):
>
> java.lang.RuntimeException: Failed request 0.
>  Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist.
> at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
> at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
> at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
> at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
> at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
> at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> From the documentation, I can see that:
>
> The client connects to a Client Proxy running on a given Task Manager. The
> proxy is the entry point of the client to the Flink cluster. It forwards
> the requests of the client to the Job Manager and the required Task
> Manager, and forwards the final response back the client.
>
> Did I miss something? Is the QueryableStateClientProxy only fetching info
> from a job that is running on his local tm? If so, is there a way to
> retrieve the job-graph? Or maybe another solution?
>
> Thanks!
> Pierre Zemb
> ​
> --
> Cordialement,
> Pierre Zemb
> pierrezemb.fr
> Software Engineer, Metrics Data Platform @OVH
>
>
> --
Cordialement,
Pierre Zemb
pierrezemb.fr
Software Engineer, Metrics Data Platform @OVH


Re: Question about QueryableState

2018-08-27 Thread Kostas Kloudas
Thanks a lot Pierre!

Kostas

> On Aug 27, 2018, at 2:16 PM, Pierre Zemb  wrote:
> 
> Hi!
> Just created the JIRA (https://issues.apache.org/jira/browse/FLINK-10225 
> ).
> 
> Thanks for your reply,
> Pierre
> 
> Le jeu. 23 août 2018 à 14:31, Kostas Kloudas  > a écrit :
> Hi Pierre,
> 
> You are right that this should not happen.
> It seems like a bug.
> Could you open a JIRA and post it here?
> 
> Thanks,
> Kostas
> 
> 
>> On Aug 21, 2018, at 9:35 PM, Pierre Zemb > > wrote:
>> 
>> Hi!
>> 
>> I’ve started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
>> and deployed a small job on it. Because of the current load, job is 
>> completely handled by a single tm. I’ve created a small proxy that is using 
>> QueryableStateClient 
>> 
>>  to access the current state. It is working nicely, except under certain 
>> circumstances. It seems to me that I can only access the state through a 
>> node that is holding a part of the job. Here’s an example:
>> 
>> job on tm1. Pointing QueryableStateClient to tm1. State accessible
>> job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
>> inaccessible
>> killing tm1, job is now on tm2. State accessible
>> job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>> adding some parallelism to spread job on tm1 and tm2. Pointing 
>> QueryableStateClient to either tm1 and tm2 is working
>> job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
>> inaccessible
>> When the state is inaccessible, I can see this (generated here 
>> ):
>> 
>> java.lang.RuntimeException: Failed request 0.
>>  Caused by: 
>> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
>> not retrieve location of state=repo-status of 
>> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
>> not ready, or ii) the job does not exist.
>> at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>> at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>> at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>> at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>> at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>> at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>> at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> From the documentation, I can see that:
>> 
>> The client connects to a Client Proxy running on a given Task Manager. The 
>> proxy is the entry point of the client to the Flink cluster. It forwards the 
>> requests of the client to the Job Manager and the required Task Manager, and 
>> forwards the final response back the client.
>> 
>> Did I miss something? Is the QueryableStateClientProxy only fetching info 
>> from a job that is running on his local tm? If so, is there a way to 
>> retrieve the job-graph? Or maybe another solution? 
>> 
>> Thanks!
>> Pierre Zemb
>> 
>> -- 
>> Cordialement,
>> Pierre Zemb
>> pierrezemb.fr <>
>> Software Engineer, Metrics Data Platform @OVH
> 
> -- 
> Cordialement,
> Pierre Zemb
> pierrezemb.fr <>
> Software Engineer, Metrics Data Platform @OVH



JobGraphs not cleaned up in HA mode

2018-08-27 Thread Encho Mishinev
I am running Flink 1.5.3 with two job managers and two task managers in 
Kubernetes along with HDFS and Zookeeper in high-availability mode.

My problem occurs after the following actions:
- Upload a .jar file to jobmanager-1
- Run a streaming job from the jar on jobmanager-1
- Wait for 1 or 2 checkpoints to succeed
- Kill pod of jobmanager-1
After a short delay, jobmanager-2 takes leadership and correctly restores the 
job and continues it
- Stop job from jobmanager-2

At this point all seems well, but the problem is that jobmanager-2 does not 
clean up anything that was left from jobmanager-1. This means that both in HDFS 
and in Zookeeper remain job graphs, which later on obstruct any work of both 
managers as after any reset they unsuccessfully try to restore a non-existent 
job and fail over and over again.

I am quite certain that jobmanager-2 does not know about any of jobmanager-1’s 
files since the Zookeeper logs reveal that it tries to duplicate job folders:

2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x46 zxid:0x1ab 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
 Error:KeeperErrorCode = NodeExists for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77

2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x5c zxid:0x1ac 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = NodeExists for 
/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15

Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper when the 
job is stopped, but fails since there are leftover files in it from 
jobmanager-1:

2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 zxid:0x1bd 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = Directory not empty for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15

I’ve noticed that when restoring the job, it seems like jobmanager-2 does not 
get anything more than jobID, while it perhaps needs some metadata? Here is the 
log that seems suspicious to me:

2018-08-27 13:09:18,113 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).

All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that it’s 
overwriting anything or not deleting properly.

My question is - what is the intended way for the job managers to correctly 
exchange metadata in HA mode and why is it not working for me?

Thanks in advance!

Re: Low Performance in High Cardinality Big Window Application

2018-08-27 Thread Ning Shi
> If you have a window larger than hours then you need to rethink your 
> architecture - this is not streaming anymore. Only because you receive events 
> in a streamed fashion you don’t need to do all the processing in a streamed 
> fashion.

Thanks for the thoughts, I’ll keep that in mind. However, in the test, it was 
not storing more than two days worth of data yet. I’m very much interested in 
understanding the root cause of the low performance before moving on to do 
major restructuring.

> Can you store the events in a file or a database and then do after 30 days 
> batch processing on them?

The 30 day window is just used for deduplication, but it triggers for every 
event and sends the result out to downstream so that we can still get real-time 
analytics on the events.

> Another aspect could be also to investigate why your source sends duplicated 
> entries.

They are not 100% duplicate events syntactically. The events are only 
duplicates from a logical sense. For example, the same person doing the same 
action multiple times at different time of day.

Ning

Re: AvroSchemaConverter and Tuple classes

2018-08-27 Thread françois lacombe
Thank you all for you answers.

It's ok with BatchTableSource


All the best

François

2018-08-26 17:40 GMT+02:00 Rong Rong :

> Yes you should be able to use Row instead of Tuple in your
> BatchTableSink.
> There's sections in Flink documentation regarding mapping of data types to
> table schemas [1]. and table can be converted into various typed DataStream
> [2] as well. Hope these are helpful.
>
> Thanks,
> Rong
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/common.html#mapping-of-data-types-to-table-schema
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/common.html#convert-a-table-into-a-
> datastream-or-dataset
>
>
>
> On Fri, Aug 24, 2018 at 8:04 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Timo,
>>
>> Thanks for your answer
>> I was looking for a Tuple as to feed a BatchTableSink subclass, but it
>> may be achived with a Row instead?
>>
>> All the best
>>
>> François
>>
>> 2018-08-24 10:21 GMT+02:00 Timo Walther :
>>
>>> Hi,
>>>
>>> tuples are just a sub category of rows. Because the tuple arity is
>>> limited to 25 fields. I think the easiest solution would be to write your
>>> own converter that maps rows to tuples if you know that you will not need
>>> more than 25 fields. Otherwise it might be easier to just use a
>>> TextInputFormat and do the parsing yourself with a library.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 23.08.18 um 18:54 schrieb françois lacombe:
>>>
>>> Hi all,

 I'm looking for best practices regarding Tuple instances creation.

 I have a TypeInformation object produced by AvroSchemaConverter.
 convertToTypeInfo("{...}");
 Is this possible to define a corresponding Tuple instance with it?
 (get the T from the TypeInformation)

 Example :
 {
   "type": "record",
   "fields": [
 { "name": "field1", "type": "int" },
 { "name": "field2", "type": "string"}
 ]}
  = Tuple2

 The same question rises with DataSet or other any record handling class
 with parametrized types.

 My goal is to parse several CsvFiles with different structures
 described in an Avro schema.
 It would be great to not hard-code structures in my Java code and only
 get types information at runtime from Avro schemas

 Is this possible?

 Thanks in advance

 François Lacombe

>>>
>>>
>>>
>>


RE: What's the advantage of using BroadcastState?

2018-08-27 Thread Radu Tudoran
Hi Fabian,

Thanks for the blog post about broadcast state. I have a question with respect 
to the update capabilities of the broadcast state:

Assume you do whatever processing logic in the main processElement function .. 
and at a given context marker you 1) would change a local field marker, to 2) 
signal that next time the broadcast function is triggered a special pattern 
should be created and broadcasted.

My question is: is such a behavior allowed? Would the new special Pattern that 
originates in an operator be shared across the other instances of the 
KeyedProcessFunction?


public static class PatternEvaluator
 extends KeyedBroadcastProcessFunction> {

public bolean test = false;

  @Override
  public void processElement(
 Action action,
 ReadOnlyContext ctx,
 Collector> out) throws Exception {

   //…logic

   if (..whatever context) {
  Test = true;
   }

   }

 @Override
 public void processBroadcastElement(
 Pattern pattern,
 Context ctx,
 Collector> out) throws Exception {
   // store the new pattern by updating the broadcast state

 BroadcastState bcState =
 ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, 
Types.POJO(Pattern.class)));
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);

   If (test) {
   bcState.put(null, new Pattern(test) );
   }

 }
}


Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Monday, August 20, 2018 9:40 AM
To: Paul Lam 
Cc: Rong Rong ; Hequn Cheng ; user 

Subject: Re: What's the advantage of using BroadcastState?

Hi,

I've recently published a blog post about Broadcast State [1].

Cheers,
Fabian

[1] 
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

2018-08-20 3:58 GMT+02:00 Paul Lam 
mailto:paullin3...@gmail.com>>:
Hi Rong, Hequn

Your answers are very helpful! Thank you!

Best Regards,
Paul Lam


在 2018年8月19日,23:30,Rong Rong mailto:walter...@gmail.com>> 
写道:

Hi Paul,

To add to Hequn's answer. Broadcast state can typically be used as "a 
low-throughput stream containing a set of rules which we want to evaluate 
against all elements coming from another stream" [1]
So to add to the difference list is: whether it is "broadcast" across all keys 
if processing a keyed stream. This is typically when it is not possible to 
derive same key field using KeySelector in CoStream.
Another additional difference is performance: BroadcastStream is "stored 
locally and is used to process all incoming elements on the other stream" thus 
requires to carefully manage the size of the BroadcastStream.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html

On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng 
mailto:chenghe...@gmail.com>> wrote:
Hi Paul,

There are some differences:
1. The BroadcastStream can broadcast data for you, i.e, data will be 
broadcasted to all downstream tasks automatically.
2. To guarantee that the contents in the Broadcast State are the same across 
all parallel instances of our operator, read-write access is only given to the 
broadcast side
3. For BroadcastState, flink guarantees that upon restoring/rescaling there 
will be no duplicates and no missing data. In case of recovery with the same or 
smaller parallelism, each task reads its checkpointed state. Upon scaling up, 
each task reads its own state, and the remaining tasks (p_new-p_old) read 
checkpoints of previous tasks in a round-robin manner. While MapState doesn't 
have such abilities.

Best, Hequn

On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Hi,

AFAIK, the difference between a BroadcastStream and a normal DataStream is that 
the BroadcastStream is with a BroadcastState, but it seems that the 
functionality of BroadcastS

withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-27 Thread françois lacombe
Hi all,

I'm currently trying to load a CSV file content with Flink 1.6.0 table API.
This error is raised as a try to execute the code written in docs
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.withFormat(new Csv(...));

> Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
   The method withFormat(Csv) is undefined for the type
BatchTableEnvironment

Am I wrong?

Thanks in advance for any hint

François


Re: Raising a bug in Flink's unit test scripts

2018-08-27 Thread Fabian Hueske
Hi Averell,

If this is a more general error, I'd prefer a separate issue & PR.

Thanks,
Fabian

Am Fr., 24. Aug. 2018 um 13:15 Uhr schrieb Averell :

> Good day everyone,
>
> I'm writing unit test for the bug fix FLINK-9940, and found that in some
> existing tests in flink-fs-tests cannot detect the issue when the file
> monitoring function emits duplicated files (i.e. a same file is reported
> multiple times).
> Could I just fix this as part of that FLINK-9940 bug fix, or I have to
> raise
> a separated bug?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [External] Re: How to do test in Flink?

2018-08-27 Thread Chang Liu
Thanks Joe!

Best regards/祝好,

Chang Liu 刘畅



On Fri, Aug 24, 2018 at 6:55 PM Joe Malt  wrote:

> Hi Chang,
>
> A time-saving tip for finding which library contains a class: go to
> https://search.maven.org/
> and enter fc: followed by the fully-qualified name of the class. You
> should get the library as a search result.
>
> In this case for example, you'd search for
> fc:org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder
>
> Best,
>
> Joe Malt
> Engineering Intern, Stream Processing
> Yelp Inc.
>
> On Fri, Aug 24, 2018 at 4:50 AM, Chang Liu  wrote:
>
>> No worries, I found it here:
>>
>> 
>> org.apache.flink
>> flink-runtime_${scala.binary.version}
>> ${flink.version}
>> test-jar
>> test
>> 
>>
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>>
>> On Fri, Aug 24, 2018 at 1:16 PM Chang Liu  wrote:
>>
>>> Hi Hequn,
>>>
>>> I have added the following dependencies:
>>>
>>> 
>>> org.apache.flink
>>> flink-streaming-java_${scala.binary.version}
>>> ${flink.version}
>>> test-jar
>>> test
>>> 
>>> 
>>> org.mockito
>>> mockito-core
>>> 2.21.0
>>> test
>>> 
>>>
>>>
>>> But got the exception:   java.lang.NoClassDefFoundError:
>>> org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder
>>>
>>> Do you know which library contains this class? Thanks :)
>>>
>>> Best regards/祝好,
>>>
>>> Chang Liu 刘畅
>>> DevOps Engineer
>>> WB TECH / Cyber Crime Prevention Team
>>>
>>> Mobile: +31(0)687859981
>>> Email: fluency...@gmail.com  &  chang.l...@ing.nl
>>>
>>>
>>>
>>> On Mon, Aug 13, 2018 at 1:42 PM Hequn Cheng 
>>> wrote:
>>>
 Hi Change,

 Try
 
 org.apache.flink
 flink-streaming-java_2.11
 ${flink.version}
 test-jar
 test
 
 .

 On Mon, Aug 13, 2018 at 6:42 PM, Chang Liu 
 wrote:

> And another question: which library should I include in order to use
> these harnesses? I do have this flink-test-utils_2.11 in my pom, but I
> cannot find the harnesses.
>
> I also have the following in my pom:
>
>- flink-core
>- flink-clients_2.11
>- flink-scala_2.11
>- flink-streaming-java_2.11
>- flink-streaming-java_2.11
>- flink-connector-kafka-0.11_2.11
>
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 13 Aug 2018, at 04:01, Hequn Cheng  wrote:
>
> Hi Chang,
>
> There are some harness tests which can be used to test your function.
> It is also a common way to test function or operator in flink internal
> tests. Currently, the harness classes mainly include:
>
>- KeyedOneInputStreamOperatorTestHarness
>- KeyedTwoInputStreamOperatorTestHarness
>- OneInputStreamOperatorTestHarness
>- TwoInputStreamOperatorTestHarness
>
> You can take a look at the source code of these classes.
>
> To be more specific, you can take a look at
> the testSlidingEventTimeWindowsApply[1], in which the RichSumReducer 
> window
> function has been tested.
>
> Best, Hequn
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L213
>
>
> On Mon, Aug 13, 2018 at 7:10 AM, Chang Liu 
> wrote:
>
>> Dear all,
>>
>> I have some questions regarding testing in Flink. The more general
>> question is: is there any guideline, template, or best practices that we
>> can follow if we want to test our flink code (more in scala)?
>>
>> I know there is this page:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/testing.html
>>  but
>> not so much written there. And I also did not find a more comprehensive
>> documentation of this library: flink-test-utils_2.11.
>>
>> One detailed question: how do you test this WindowFunction below? The
>> return type is Unit right? We cannot do unit test on like, like how the
>> ReduceFunction was tested in the example link above. Then we only have 
>> the
>> option of doing integration testing on it?
>> 
>>
>>
>> Your ideas would be very helpful :) Thanks in advance !
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>>
>
>

>


Re: What's the advantage of using BroadcastState?

2018-08-27 Thread Xingcan Cui
Hi Radu,

I cannot make a full understanding of your question but I guess the answer is 
NO.

The broadcast state pattern just provides you with an automatic data 
broadcasting and a bunch of map states to cache the "low-throughput” patterns. 
Also, to keep consistency, it forbid the `processElement()` to modify the 
states. But this API does not really broadcast the states. You should keep the 
logic for `processBraodcastElement()` deterministic. Maybe the equation below 
could make the pattern clear.

 +  =  = 

Best,
Xingcan

> On Aug 27, 2018, at 10:23 PM, Radu Tudoran  wrote:
> 
> Hi Fabian,
>  
> Thanks for the blog post about broadcast state. I have a question with 
> respect to the update capabilities of the broadcast state:
>  
> Assume you do whatever processing logic in the main processElement function 
> .. and at a given context marker you 1) would change a local field marker, to 
> 2) signal that next time the broadcast function is triggered a special 
> pattern should be created and broadcasted.
>  
> My question is: is such a behavior allowed? Would the new special Pattern 
> that originates in an operator be shared across the other instances of the 
> KeyedProcessFunction?
>  
>  
> public static class PatternEvaluator
>  extends KeyedBroadcastProcessFunction Pattern>> {
> 
> public bolean test = false;
>  
>   @Override
>   public void processElement(
>  Action action, 
>  ReadOnlyContext ctx, 
>  Collector> out) throws Exception {
>  
>//…logic
>   
>if (..whatever context) {
>   Test = true;
>}
>  
>}
> 
>  @Override
>  public void processBroadcastElement(
>  Pattern pattern, 
>  Context ctx, 
>  Collector> out) throws Exception {
>// store the new pattern by updating the broadcast state
>   
>  BroadcastState bcState =
>  ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, 
> Types.POJO(Pattern.class)));
>// storing in MapState with null as VOID default value
>bcState.put(null, pattern);
> 
>If (test) {
>bcState.put(null, new Pattern(test) );
>}
>  
>  }
> }
>  
>  
> Dr. Radu Tudoran
> Staff Research Engineer - Big Data Expert
> IT R&D Division
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com 
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
> 
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
> Sent: Monday, August 20, 2018 9:40 AM
> To: Paul Lam mailto:paullin3...@gmail.com>>
> Cc: Rong Rong mailto:walter...@gmail.com>>; Hequn Cheng 
> mailto:chenghe...@gmail.com>>; user 
> mailto:user@flink.apache.org>>
> Subject: Re: What's the advantage of using BroadcastState?
>  
> Hi,
>  
> I've recently published a blog post about Broadcast State [1].
>  
> Cheers,
> Fabian
>  
> [1] 
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>  
> 
>  
> 2018-08-20 3:58 GMT+02:00 Paul Lam  >:
> Hi Rong, Hequn
>  
> Your answers are very helpful! Thank you!
>  
> Best Regards,
> Paul Lam
> 
> 
> 在 2018年8月19日,23:30,Rong Rong  > 写道:
>  
> Hi Paul,
>  
> To add to Hequn's answer. Broadcast state can typically be used as "a 
> low-throughput stream containing a set of rules which we want to evaluate 
> against all elements coming from another stream" [1] 
> So to add to the difference list is: whether it is "broadcast" across all 
> keys if processing a keyed stream. This is typically when it is not possible 
> to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored 
> locally and is used to process all incoming elements on the other stream" 
> thus requires to carefully manage the size of the BroadcastStream.
>  
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
>  
> 

Difficulty managing keyed streams

2018-08-27 Thread Gabriel Pelielo | Stone
Hello everyone.


I'm currently developing a flink program to aggregate information about my 
company's clients' credit card transactions. Each transaction has a clientId 
and a transactionDate related to it. What I want to do is make a Sliding week 
time window with size 21 days sliding every 1 hour, comparing the hourly number 
of transactions of a client to their last week's number of transactions in the 
same hour, for example:


  *   Monday, August 13th 10:00 to 11:00 => 42 transactions.
  *   Monday, August 20th 10:00 to 11:00 => 20 transactions.
  *   Monday, August 27th 10:00 to 11:00 => 29 transactions.


My problem is that if a client does not make a single transaction for a whole 
hour, I don't get a 0 appended to my resulting list of transaction. What I need 
as the final result is a list containing 3 elements, each representing the 
count of transactions, the start and the end of the window, for example:


TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0, 10).


I have a keyed stream with the key being the clientId and my ideia to solve 
this problem was to append a 0 to the list of the clientIds that did not make 
any transactions on that hour whenever another clientId has finished its 
window, but I don't know how to achieve this. Any help would be appreciated.


I'm coding in Scala with Flink 1.4 and a piece of my code is the following:

val profileStream: DataStream[Profile] = streamFromKafka
  .map(openTransaction => Transaction(openTransaction.clientId, 
openTransaction.transactionDate))
  .keyBy(trx => trx.clientId)
  .window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
  .aggregate(new CountAggregate(), new TransactionProcessWindowFunction())


Best Regards,



[http://blog.stone.com.br/wp-content/uploads/2017/08/stone-assinatura.png]




Gabriel Pelielo
Fraud Detection

+ 55 21 98603 9725
gpeli...@stone.com.br

stone.com.br





O conteúdo desta mensagem é confidencial e destinado exclusivamente aos 
destinatários. Caso a receba por engano, favor destruí-la e notificar o 
remetente de imediato. O correio eletrônico não configura meio seguro para 
transmissão de dados e o remetente NÃO se responsabiliza por eventual erro, 
atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário 
solicitar versão física sempre que necessário.

The content of this message is confidential and was intended solely to its 
recipient. In case this message is received by mistake, please destroy it and 
notify the sender immediately. Electronic mails are not a safe channel for data 
transmission and the sender accepts NO liability for eventual errors, delays, 
loss, interception or virus infection. When necessary, the receiver must 
request a hard-copy version.


Re: would you join a Slack workspace for Flink?

2018-08-27 Thread Nicos Maris
I agree with you Fabian.

The question then is how to instruct users to add code to their email. What
about the following? Where should it be placed?


If you send us an email with a code snippet, make sure that:

1. you do not link to files in external services as such files can change,
get deleted or the link might break and thus make an archived email thread
useless
2. you paste text instead of screenshots of text
3. you keep formatting when pasting code in order to keep the code readable
4. there are enough import statements to avoid ambiguities



On Mon, Aug 27, 2018 at 10:51 AM Fabian Hueske  wrote:

> Hi,
>
> I don't think that recommending Gists is a good idea.
> Sure, well formatted and highlighted code is nice and much better than
> posting screenshots but Gists can be deleted.
> Deleting a Gist would make an archived thread useless.
> I would definitely support instructions on how to add code to a mail.
>
> Regarding the overall topic of some kind of chat room.
> I would not participate in that.
> The Flink user mailing list is one of the most active user lists of the
> ASF.
> It's already quite challenging to keep up with the loads of mails.
> Adding a synchronous channel to that would make things worse for me.
> (That's probably also one of the reasons why the IRC channel is abandoned.)
> I can of course only speak for myself, but I would imaging that many
> members of the community who are helping out on the mailing list feel the
> same.
>
> Best,
> Fabian
>
> Am So., 26. Aug. 2018 um 15:17 Uhr schrieb Nicos Maris <
> nicos.ma...@gmail.com>:
>
>> Hi Dominik,
>>
>> I was writing about gitter just now :)
>>
>> If searchability is an issue, then indeed we could consider the free plan
>> of gitter: https://billing.gitter.im
>>
>>
>> In any case, we should instruct users who paste code snippets at the
>> mailing list to use http://gist.github.com
>>
>>
>> On Sun, Aug 26, 2018 at 4:13 PM Dominik Wosiński 
>> wrote:
>>
>>>
>>>
>>> -- Forwarded message -
>>> From: Dominik Wosiński 
>>> Date: niedz., 26 sie 2018 o 15:12
>>> Subject: ODP: would you join a Slack workspace for Flink?
>>> To: Hequn Cheng 
>>>
>>>
>>> Hey,
>>> I have been facing this issue for multiple open source projects and
>>> discussions. Slack in my opinion has two main issues :
>>>
>>>  - the already mentioned issue with searching, through
>>> search engine
>>>
>>>  - Slack is still commercial application.
>>>
>>> The second issue is quite important, because for free version Slack
>>> gives 10k messages of history. I personally think that for Flink this would
>>> to loss all messages that are older than a week possibly. This is the big
>>> issue as it woul most certainly lead to asking the same questions over and
>>> over again. I’ve seen really big slack groups for some big projects where
>>> the history would last like 3-4 days and this is pure nightmare.
>>>
>>> The better solution would be to use gitter than Slack IMHO if there is
>>> need for such way of communication.
>>>
>>> Best Regards,
>>> Dominik.
>>>
>>>
>>>
>>> Wysłane z aplikacji Poczta
>>>  dla Windows 10
>>>
>>>
>>>
>>> *Od: *Hequn Cheng 
>>> *Wysłano: *niedziela, 26 sierpnia 2018 14:37
>>> *Do: *Nicos Maris 
>>> *DW: *ches...@apache.org; user 
>>> *Temat: *Re: would you join a Slack workspace for Flink?
>>>
>>>
>>>
>>> Hi Nicos,
>>>
>>>
>>>
>>> Thanks for bring up this discussion. :-)
>>>
>>> Slack is a good way to communicate, but it seems not very fit for the
>>> open source field. The messages on Slack are mixed up and can not be
>>> searched through search engine.
>>>
>>>
>>>
>>> Best, Hequn
>>>
>>>
>>>
>>> On Sun, Aug 26, 2018 at 7:22 PM Nicos Maris 
>>> wrote:
>>>
>>> Chesnay can you take a look at the following PR?
>>>
>>>
>>>
>>> https://github.com/apache/flink-web/pull/120
>>>
>>>
>>>
>>> On Sun, Aug 26, 2018 at 1:09 PM Chesnay Schepler 
>>> wrote:
>>>
>>> There have been previous discussions around using slack and they were
>>> rejected.
>>>
>>> Personally I would just remove the IRC channel; I'm not aware of any
>>> committer actually spending time there.
>>>
>>> On 25.08.2018 17:07, Nicos Maris wrote:
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>>
>>>
>>> This mailing list is for user support and questions. If you would also
>>> use slack for user support and questions, then please vote at the following
>>> ticket. If you don't have an account at that jira, you can reply to this
>>> email with a "+1".
>>>
>>>
>>>
>>>
>>>
>>> [FLINK-10217 ] use
>>> Slack for user support and questions
>>>  Current status
>>>
>>> For user support and questions, users are instructed to subscribe to
>>> user@flink.apache.org but there are users like me who enjoy using also
>>> a chat channel. However, the instructions to do so are not clear and the
>>> IRC activity is low and it is definitely not indicative of the
>>> project's activity
>>> 

Re: Kryo Serialization Issue

2018-08-27 Thread Darshan Singh
Thanks,  We ran into differnet errors and then realized it was OOM issue
which was causing different parts to be failed.
Flink was buffering too much data as we were reading too fast from source.
Reducing the speed fixed the issue.

However, I am curious how to achieve the same with S3 apart from limiting
the number of files to read at same time.

Thanks

On Sun, Aug 26, 2018 at 5:32 PM Rong Rong  wrote:

> This seems to be irrelevant to the issue for KyroSerializer in recent
> discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
> On a quick glance, this might have been a corrupted message in your
> decoding, for example a malformed JSON string.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-8836
>
> On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh 
> wrote:
>
>> Hi,
>>
>> I am using a map function on a data stream which has 1 column i.e. a json
>> string. Map function simply uses Jackson mapper and convert the String to
>> ObjectNode and also assign key based on one of the value in Object node.
>>
>> The code seems to work fine for 2-3 minutes as expected and then suddenly
>> it fails with below error. I looked at the mailing list and most of the
>> issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not
>> sure what needs to do.
>>
>> Just wanted to know if we will need to write our own Serializer for
>> ObjectNode to fix this issue or there is some setting we are missing.
>>
>> Thanks
>>
>> ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
>> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>> at java.util.ArrayList.get(ArrayList.java:433)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
>> at
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>


Re: would you join a Slack workspace for Flink?

2018-08-27 Thread Fabian Hueske
Hi Nicos,

That looks like a good start!
Would you like to open n issue and a pull request?

Thanks, Fabian

Nicos Maris  schrieb am Mo., 27. Aug. 2018, 17:49:

> I agree with you Fabian.
>
> The question then is how to instruct users to add code to their email.
> What about the following? Where should it be placed?
>
>
> If you send us an email with a code snippet, make sure that:
>
> 1. you do not link to files in external services as such files can change,
> get deleted or the link might break and thus make an archived email thread
> useless
> 2. you paste text instead of screenshots of text
> 3. you keep formatting when pasting code in order to keep the code readable
> 4. there are enough import statements to avoid ambiguities
>
>
>
> On Mon, Aug 27, 2018 at 10:51 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> I don't think that recommending Gists is a good idea.
>> Sure, well formatted and highlighted code is nice and much better than
>> posting screenshots but Gists can be deleted.
>> Deleting a Gist would make an archived thread useless.
>> I would definitely support instructions on how to add code to a mail.
>>
>> Regarding the overall topic of some kind of chat room.
>> I would not participate in that.
>> The Flink user mailing list is one of the most active user lists of the
>> ASF.
>> It's already quite challenging to keep up with the loads of mails.
>> Adding a synchronous channel to that would make things worse for me.
>> (That's probably also one of the reasons why the IRC channel is abandoned.)
>> I can of course only speak for myself, but I would imaging that many
>> members of the community who are helping out on the mailing list feel the
>> same.
>>
>> Best,
>> Fabian
>>
>> Am So., 26. Aug. 2018 um 15:17 Uhr schrieb Nicos Maris <
>> nicos.ma...@gmail.com>:
>>
>>> Hi Dominik,
>>>
>>> I was writing about gitter just now :)
>>>
>>> If searchability is an issue, then indeed we could consider the free
>>> plan of gitter: https://billing.gitter.im
>>>
>>>
>>> In any case, we should instruct users who paste code snippets at the
>>> mailing list to use http://gist.github.com
>>>
>>>
>>> On Sun, Aug 26, 2018 at 4:13 PM Dominik Wosiński 
>>> wrote:
>>>


 -- Forwarded message -
 From: Dominik Wosiński 
 Date: niedz., 26 sie 2018 o 15:12
 Subject: ODP: would you join a Slack workspace for Flink?
 To: Hequn Cheng 


 Hey,
 I have been facing this issue for multiple open source projects and
 discussions. Slack in my opinion has two main issues :

  - the already mentioned issue with searching, through
 search engine

  - Slack is still commercial application.

 The second issue is quite important, because for free version Slack
 gives 10k messages of history. I personally think that for Flink this would
 to loss all messages that are older than a week possibly. This is the big
 issue as it woul most certainly lead to asking the same questions over and
 over again. I’ve seen really big slack groups for some big projects where
 the history would last like 3-4 days and this is pure nightmare.

 The better solution would be to use gitter than Slack IMHO if there is
 need for such way of communication.

 Best Regards,
 Dominik.



 Wysłane z aplikacji Poczta
  dla Windows 10



 *Od: *Hequn Cheng 
 *Wysłano: *niedziela, 26 sierpnia 2018 14:37
 *Do: *Nicos Maris 
 *DW: *ches...@apache.org; user 
 *Temat: *Re: would you join a Slack workspace for Flink?



 Hi Nicos,



 Thanks for bring up this discussion. :-)

 Slack is a good way to communicate, but it seems not very fit for the
 open source field. The messages on Slack are mixed up and can not be
 searched through search engine.



 Best, Hequn



 On Sun, Aug 26, 2018 at 7:22 PM Nicos Maris 
 wrote:

 Chesnay can you take a look at the following PR?



 https://github.com/apache/flink-web/pull/120



 On Sun, Aug 26, 2018 at 1:09 PM Chesnay Schepler 
 wrote:

 There have been previous discussions around using slack and they were
 rejected.

 Personally I would just remove the IRC channel; I'm not aware of any
 committer actually spending time there.

 On 25.08.2018 17:07, Nicos Maris wrote:



 Hi all,





 This mailing list is for user support and questions. If you would also
 use slack for user support and questions, then please vote at the following
 ticket. If you don't have an account at that jira, you can reply to this
 email with a "+1".





 [FLINK-10217 ] use
 Slack for user support and questions
  Current

Re: JobGraphs not cleaned up in HA mode

2018-08-27 Thread vino yang
Hi Encho,

This is a problem already known to the Flink community, you can track its
progress through FLINK-10011[1], and currently Till is fixing this issue.

[1]: https://issues.apache.org/jira/browse/FLINK-10011

Thanks, vino.

Encho Mishinev  于2018年8月27日周一 下午10:13写道:

> I am running Flink 1.5.3 with two job managers and two task managers in
> Kubernetes along with HDFS and Zookeeper in high-availability mode.
>
> My problem occurs after the following actions:
> - Upload a .jar file to jobmanager-1
> - Run a streaming job from the jar on jobmanager-1
> - Wait for 1 or 2 checkpoints to succeed
> - Kill pod of jobmanager-1
> After a short delay, jobmanager-2 takes leadership and correctly restores
> the job and continues it
> - Stop job from jobmanager-2
>
> At this point all seems well, but the problem is that jobmanager-2 does
> not clean up anything that was left from jobmanager-1. This means that both
> in HDFS and in Zookeeper remain job graphs, which later on obstruct any
> work of both managers as after any reset they unsuccessfully try to restore
> a non-existent job and fail over and over again.
>
> I am quite certain that jobmanager-2 does not know about any of
> jobmanager-1’s files since the Zookeeper logs reveal that it tries to
> duplicate job folders:
>
> 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46
> zxid:0x1ab txntype:-1 reqpath:n/a Error
> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
> Error:KeeperErrorCode = NodeExists for
> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>
> 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c
> zxid:0x1ac txntype:-1 reqpath:n/a Error
> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
> Error:KeeperErrorCode = NodeExists for
> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>
> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
> when the job is stopped, but fails since there are leftover files in it
> from jobmanager-1:
>
> 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8
> zxid:0x1bd txntype:-1 reqpath:n/a Error
> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
> Error:KeeperErrorCode = Directory not empty for
> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>
> I’ve noticed that when restoring the job, it seems like jobmanager-2 does
> not get anything more than jobID, while it perhaps needs some metadata?
> Here is the log that seems suspicious to me:
>
> 2018-08-27 13:09:18,113 INFO
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).
>
> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that
> it’s overwriting anything or not deleting properly.
>
> My question is - what is the intended way for the job managers to
> correctly exchange metadata in HA mode and why is it not working for me?
>
> Thanks in advance!


Re: JobGraphs not cleaned up in HA mode

2018-08-27 Thread vino yang
About some implementation mechanisms.
Flink uses Zookeeper to store JobGraph (Job's description information and
metadata) as a basis for Job recovery.
However, previous implementations may cause this information to not be
properly cleaned up because it is asynchronously deleted by a background
thread.

Thanks, vino.

vino yang  于2018年8月28日周二 上午9:49写道:

> Hi Encho,
>
> This is a problem already known to the Flink community, you can track its
> progress through FLINK-10011[1], and currently Till is fixing this issue.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>
> Thanks, vino.
>
> Encho Mishinev  于2018年8月27日周一 下午10:13写道:
>
>> I am running Flink 1.5.3 with two job managers and two task managers in
>> Kubernetes along with HDFS and Zookeeper in high-availability mode.
>>
>> My problem occurs after the following actions:
>> - Upload a .jar file to jobmanager-1
>> - Run a streaming job from the jar on jobmanager-1
>> - Wait for 1 or 2 checkpoints to succeed
>> - Kill pod of jobmanager-1
>> After a short delay, jobmanager-2 takes leadership and correctly restores
>> the job and continues it
>> - Stop job from jobmanager-2
>>
>> At this point all seems well, but the problem is that jobmanager-2 does
>> not clean up anything that was left from jobmanager-1. This means that both
>> in HDFS and in Zookeeper remain job graphs, which later on obstruct any
>> work of both managers as after any reset they unsuccessfully try to restore
>> a non-existent job and fail over and over again.
>>
>> I am quite certain that jobmanager-2 does not know about any of
>> jobmanager-1’s files since the Zookeeper logs reveal that it tries to
>> duplicate job folders:
>>
>> 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46
>> zxid:0x1ab txntype:-1 reqpath:n/a Error
>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>> Error:KeeperErrorCode = NodeExists for
>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>
>> 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c
>> zxid:0x1ac txntype:-1 reqpath:n/a Error
>> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>> Error:KeeperErrorCode = NodeExists for
>> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>
>> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
>> when the job is stopped, but fails since there are leftover files in it
>> from jobmanager-1:
>>
>> 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8
>> zxid:0x1bd txntype:-1 reqpath:n/a Error
>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>> Error:KeeperErrorCode = Directory not empty for
>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>
>> I’ve noticed that when restoring the job, it seems like jobmanager-2 does
>> not get anything more than jobID, while it perhaps needs some metadata?
>> Here is the log that seems suspicious to me:
>>
>> 2018-08-27 13:09:18,113 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).
>>
>> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware
>> that it’s overwriting anything or not deleting properly.
>>
>> My question is - what is the intended way for the job managers to
>> correctly exchange metadata in HA mode and why is it not working for me?
>>
>> Thanks in advance!
>
>


Re: Difficulty managing keyed streams

2018-08-27 Thread vino yang
Hi Gabriel,

In your scenario, I guess you should be based on Event time.
In this case, I think you can implement self-triggering by customizing the
trigger of the window, and then combine ProcessWindowFunction[1] to define
your calculation logic.
Because most of your time is based on Watermark, and very few scenes
require timing triggers.

A similar example is here.[2]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
[2]:
https://stackoverflow.com/questions/47059762/apache-flink-send-event-if-no-data-was-received-for-x-minutes

Thanks, vino.


Gabriel Pelielo | Stone  于2018年8月27日周一 下午11:29写道:

> Hello everyone.
>
>
> I'm currently developing a flink program to aggregate information about my
> company's clients' credit card transactions. Each transaction has a
> clientId and a transactionDate related to it. What I want to do is make a
> Sliding week time window with size 21 days sliding every 1 hour, comparing
> the hourly number of transactions of a client to their last week's number
> of transactions in the same hour, for example:
>
>
>
>- Monday, August 13th 10:00 to 11:00 => 42 transactions.
>- Monday, August 20th 10:00 to 11:00 => 20 transactions.
>- Monday, August 27th 10:00 to 11:00 => 29 transactions.
>
>
> My problem is that if a client does not make a single transaction for a
> whole hour, I don't get a 0 appended to my resulting list of transaction.
> What I need as the final result is a list containing 3 elements, each
> representing the count of transactions, the start and the end of the
> window, for example:
>
>
> *TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0,
> 10).*
>
>
> I have a keyed stream with the key being the clientId and my ideia to
> solve this problem was to append a 0 to the list of the clientIds that did
> not make any transactions on that hour whenever another clientId has
> finished its window, but I don't know how to achieve this. Any help would
> be appreciated.
>
>
> I'm coding in Scala with Flink 1.4 and a piece of my code is the following:
>
> val profileStream: DataStream[Profile] = streamFromKafka
>   .map(openTransaction => Transaction(openTransaction.clientId, 
> openTransaction.transactionDate))
>   .keyBy(trx => trx.clientId)
>   .window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
>   .aggregate(new CountAggregate(), new TransactionProcessWindowFunction())
>
>
> Best Regards,
>
>
>
>
>
>
> * Gabriel Pelielo*
> Fraud Detection
>
> + 55 21 98603 9725
> gpeli...@stone.com.br
> stone.com.br 
>
>
> *O conteúdo desta mensagem é confidencial e destinado exclusivamente aos
> destinatários. Caso a receba por engano, favor destruí-la e notificar o
> remetente de imediato. O correio eletrônico não configura meio seguro para
> transmissão de dados e o remetente NÃO se responsabiliza por eventual erro,
> atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário
> solicitar versão física sempre que necessário.*
>
> * The content of this message is confidential and was intended solely to
> its recipient. **In case this message is received by mistake, please
> destroy it and notify the sender immediately. Electronic mails are not a
> safe channel for data transmission and the sender accepts NO liability for
> eventual errors, delays, loss, interception or virus infection. When
> necessary, the receiver must request a hard-copy version.*
>
>


Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-27 Thread vino yang
Hi Francois,

Yes, the withFormat API comes from an instance of BatchTableDescriptor, and
the BatchTableDescriptor instance is returned by the connect API, so you
should call BatchTableEnvironment#connect first.

Thanks, vino.

françois lacombe  于2018年8月27日周一 下午10:26写道:

> Hi all,
>
> I'm currently trying to load a CSV file content with Flink 1.6.0 table API.
> This error is raised as a try to execute the code written in docs
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
> tEnv.withFormat(new Csv(...));
>
> > Exception in thread "main" java.lang.Error: Unresolved compilation
> problem:
>The method withFormat(Csv) is undefined for the type
> BatchTableEnvironment
>
> Am I wrong?
>
> Thanks in advance for any hint
>
> François
>


Re: JobGraphs not cleaned up in HA mode

2018-08-27 Thread Encho Mishinev
Thank you very much for the info! Will keep track of the progress.

In the meantime is there any viable workaround? It seems like HA doesn't
really work due to this bug.

On Tue, Aug 28, 2018 at 4:52 AM vino yang  wrote:

> About some implementation mechanisms.
> Flink uses Zookeeper to store JobGraph (Job's description information and
> metadata) as a basis for Job recovery.
> However, previous implementations may cause this information to not be
> properly cleaned up because it is asynchronously deleted by a background
> thread.
>
> Thanks, vino.
>
> vino yang  于2018年8月28日周二 上午9:49写道:
>
>> Hi Encho,
>>
>> This is a problem already known to the Flink community, you can track its
>> progress through FLINK-10011[1], and currently Till is fixing this issue.
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>>
>> Thanks, vino.
>>
>> Encho Mishinev  于2018年8月27日周一 下午10:13写道:
>>
>>> I am running Flink 1.5.3 with two job managers and two task managers in
>>> Kubernetes along with HDFS and Zookeeper in high-availability mode.
>>>
>>> My problem occurs after the following actions:
>>> - Upload a .jar file to jobmanager-1
>>> - Run a streaming job from the jar on jobmanager-1
>>> - Wait for 1 or 2 checkpoints to succeed
>>> - Kill pod of jobmanager-1
>>> After a short delay, jobmanager-2 takes leadership and correctly
>>> restores the job and continues it
>>> - Stop job from jobmanager-2
>>>
>>> At this point all seems well, but the problem is that jobmanager-2 does
>>> not clean up anything that was left from jobmanager-1. This means that both
>>> in HDFS and in Zookeeper remain job graphs, which later on obstruct any
>>> work of both managers as after any reset they unsuccessfully try to restore
>>> a non-existent job and fail over and over again.
>>>
>>> I am quite certain that jobmanager-2 does not know about any of
>>> jobmanager-1’s files since the Zookeeper logs reveal that it tries to
>>> duplicate job folders:
>>>
>>> 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46
>>> zxid:0x1ab txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>> Error:KeeperErrorCode = NodeExists for
>>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>>
>>> 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c
>>> zxid:0x1ac txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>> Error:KeeperErrorCode = NodeExists for
>>> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>>
>>> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
>>> when the job is stopped, but fails since there are leftover files in it
>>> from jobmanager-1:
>>>
>>> 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
>>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>>> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8
>>> zxid:0x1bd txntype:-1 reqpath:n/a Error
>>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>> Error:KeeperErrorCode = Directory not empty for
>>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>>
>>> I’ve noticed that when restoring the job, it seems like jobmanager-2
>>> does not get anything more than jobID, while it perhaps needs some
>>> metadata? Here is the log that seems suspicious to me:
>>>
>>> 2018-08-27 13:09:18,113 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).
>>>
>>> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware
>>> that it’s overwriting anything or not deleting properly.
>>>
>>> My question is - what is the intended way for the job managers to
>>> correctly exchange metadata in HA mode and why is it not working for me?
>>>
>>> Thanks in advance!
>>
>>


Re: JobGraphs not cleaned up in HA mode

2018-08-27 Thread vino yang
Hi Encho,

A temporary solution can be used to determine if it has been cleaned up by
monitoring the specific JobID under Zookeeper's "/jobgraph".
Another solution, modify the source code, rudely modify the cleanup mode to
the synchronous form, but the flink operation Zookeeper's path needs to
obtain the corresponding lock, so it is dangerous to do so, and it is not
recommended.
I think maybe this problem can be solved in the next version. It depends on
Till.

Thanks, vino.

Encho Mishinev  于2018年8月28日周二 下午1:17写道:

> Thank you very much for the info! Will keep track of the progress.
>
> In the meantime is there any viable workaround? It seems like HA doesn't
> really work due to this bug.
>
> On Tue, Aug 28, 2018 at 4:52 AM vino yang  wrote:
>
>> About some implementation mechanisms.
>> Flink uses Zookeeper to store JobGraph (Job's description information and
>> metadata) as a basis for Job recovery.
>> However, previous implementations may cause this information to not be
>> properly cleaned up because it is asynchronously deleted by a background
>> thread.
>>
>> Thanks, vino.
>>
>> vino yang  于2018年8月28日周二 上午9:49写道:
>>
>>> Hi Encho,
>>>
>>> This is a problem already known to the Flink community, you can track
>>> its progress through FLINK-10011[1], and currently Till is fixing this
>>> issue.
>>>
>>> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>>>
>>> Thanks, vino.
>>>
>>> Encho Mishinev  于2018年8月27日周一 下午10:13写道:
>>>
 I am running Flink 1.5.3 with two job managers and two task managers in
 Kubernetes along with HDFS and Zookeeper in high-availability mode.

 My problem occurs after the following actions:
 - Upload a .jar file to jobmanager-1
 - Run a streaming job from the jar on jobmanager-1
 - Wait for 1 or 2 checkpoints to succeed
 - Kill pod of jobmanager-1
 After a short delay, jobmanager-2 takes leadership and correctly
 restores the job and continues it
 - Stop job from jobmanager-2

 At this point all seems well, but the problem is that jobmanager-2 does
 not clean up anything that was left from jobmanager-1. This means that both
 in HDFS and in Zookeeper remain job graphs, which later on obstruct any
 work of both managers as after any reset they unsuccessfully try to restore
 a non-existent job and fail over and over again.

 I am quite certain that jobmanager-2 does not know about any of
 jobmanager-1’s files since the Zookeeper logs reveal that it tries to
 duplicate job folders:

 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
 cport:2181)::PrepRequestProcessor@648] - Got user-level
 KeeperException when processing sessionid:0x1657aa15e480033 type:create
 cxid:0x46 zxid:0x1ab txntype:-1 reqpath:n/a Error
 Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
 Error:KeeperErrorCode = NodeExists for
 /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77

 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
 cport:2181)::PrepRequestProcessor@648] - Got user-level
 KeeperException when processing sessionid:0x1657aa15e480033 type:create
 cxid:0x5c zxid:0x1ac txntype:-1 reqpath:n/a Error
 Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
 Error:KeeperErrorCode = NodeExists for
 /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15

 Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
 when the job is stopped, but fails since there are leftover files in it
 from jobmanager-1:

 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
 cport:2181)::PrepRequestProcessor@648] - Got user-level
 KeeperException when processing sessionid:0x1657aa15e480033 type:delete
 cxid:0xa8 zxid:0x1bd txntype:-1 reqpath:n/a Error
 Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
 Error:KeeperErrorCode = Directory not empty for
 /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15

 I’ve noticed that when restoring the job, it seems like jobmanager-2
 does not get anything more than jobID, while it perhaps needs some
 metadata? Here is the log that seems suspicious to me:

 2018-08-27 13:09:18,113 INFO
 org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
 Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).

 All other logs seem fine in jobmanager-2, it doesn’t seem to be aware
 that it’s overwriting anything or not deleting properly.

 My question is - what is the intended way for the job managers to
 correctly exchange metadata in HA mode and why is it not working for me?

 Thanks in advance!
>>>
>>>


When will FLink support kafka1.0

2018-08-27 Thread spoon_lz
Some other departments within the company have adopted the version of
kafka1.0. I have seen that flink currently supports 0.9, 0.10 and 0.11. When
will the version of kafka1.0 be supported ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/