Re: When should the RETAIN_ON_CANCELLATION option be used?

2018-09-24 Thread 徐涛
Hi Vino,
What is the definition and difference between job cancel and job fails?
Can I say that if the program is shutdown artificially, then it is a 
job cancel,
   if the program is shutdown due to some error, it 
is a job fail?

This is important because it is the prerequisite for the following 
question:

In the document of Flink 1.6, it says:
"ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails."
But it does not says whether the checkpoint will be retained on fail.
If the checkpoint activity of fail is the same as cancel, then I have 
to use RETAIL_ON_CANCELLATION, because if I do not use it, the checkpoint will 
be deleted on job fail.
If the checkpoint activity of fail is not delete, then at this case it 
is safe on job fail.

Best 
Henry   



> 在 2018年9月25日,上午11:16,vino yang  写道:
> 
> Hi Henry,
> 
> Answer your question:
> 
> What is the definition and difference between job cancel and job fails?
> 
> > The cancellation and failure of the job will cause the job to enter the 
> > termination state. But cancellation is artificially triggered and normally 
> > terminated, while failure is usually a passive termination due to an 
> > exception.
> 
> If I use DELETE_ON_CANCELLATION option, in this case, does I have the 
> checkpoint to resume the program?
> 
> > No, if you use externalized checkpoints. you cannot resume from 
> > externalized checkpoints after the job has been cancelled.
> 
> I mean if I can guarantee that a savepoint can always be made before manually 
> cancelation. If I use DELETE_ON_CANCELLATION option on checkpoints, is there 
> any probability that I do not have a checkpoint to recover from?
> 
> > From the latest source code, savepoint is not affected by 
> > CheckpointRetentionPolicy, it needs to be cleaned up manually.
> 
> Thanks, vino.
> 
> 徐涛 mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
> 上午11:06写道:
> Hi All,
>   I mean if I can guarantee that a savepoint can always be made before 
> manually cancelation. If I use DELETE_ON_CANCELLATION option on checkpoints, 
> is there any probability that I do not have a checkpoint to recover from?
>   Thank a a lot.
> 
> Best
> Henry
> 
> 
> 
>> 在 2018年9月25日,上午10:41,徐涛 > > 写道:
>> 
>> Hi All,
>>  In flink document, it says
>>  DELETE_ON_CANCELLATION: “Delete the checkpoint when the job is 
>> cancelled. The checkpoint state will only be available if the job fails.”
>>  What is the definition and difference between job cancel and job fails? 
>> If I run the program on yarn, and after a few days, the yarn application get 
>> failed for some reason.
>>  If I use DELETE_ON_CANCELLATION option, in this case, does I have the 
>> checkpoint to resume the program?
>> 
>>  If the checkpoint are only deleted when I cancel the program, I can 
>> always make the savepoint before cancelation. Then it seems that I can only 
>> set DELETE_ON_CANCELLATION then.
>>  I can not find a case that RETAIN_ON_CANCELLATION should be used.
>>  
>> 
>> Best
>> Henry
>> 
> 



Re: When should the RETAIN_ON_CANCELLATION option be used?

2018-09-24 Thread vino yang
Hi Henry,

Answer your question:

What is the definition and difference between job cancel and job fails?

> The cancellation and failure of the job will cause the job to enter the
termination state. But cancellation is artificially triggered and normally
terminated, while failure is usually a passive termination due to an
exception.

If I use DELETE_ON_CANCELLATION option, in this case, does I have the
checkpoint to resume the program?

> No, if you use externalized checkpoints. you cannot resume from
externalized checkpoints after the job has been cancelled.

I mean if I can guarantee that a savepoint can always be made before
manually cancelation. If I use DELETE_ON_CANCELLATION option on
checkpoints, is there any probability that I do not have a checkpoint to
recover from?

> From the latest source code, savepoint is not affected by
CheckpointRetentionPolicy, it needs to be cleaned up manually.

Thanks, vino.

徐涛  于2018年9月25日周二 上午11:06写道:

> Hi All,
> I mean if I can guarantee that a savepoint can always be made before
> manually cancelation. If I use DELETE_ON_CANCELLATION option on
> checkpoints, is there any probability that I do not have a checkpoint to
> recover from?
> Thank a a lot.
>
> Best
> Henry
>
>
>
> 在 2018年9月25日,上午10:41,徐涛  写道:
>
> Hi All,
> In flink document, it says
> DELETE_ON_CANCELLATION: “Delete the checkpoint when the job is cancelled.
> The checkpoint state will only be available if the job fails.”
> What is the definition and difference between job cancel and job fails? If
> I run the program on yarn, and after a few days, the yarn application get
> failed for some reason.
> If I use DELETE_ON_CANCELLATION option, in this case, does I have the
> checkpoint to resume the program?
>
> If the checkpoint are *only* deleted when I cancel the program, I can
> always make the savepoint before cancelation. Then it seems that I can
> *only* set DELETE_ON_CANCELLATION then.
> I can not find a case that RETAIN_ON_CANCELLATION should be used.
>
> Best
> Henry
>
>
>


Re: How to get the location of keytab when using flink on yarn

2018-09-24 Thread Rong Rong
Hi

Just a quick thought on this:
You might be able to use delegation token to access HBase[1]. It might be a
more secure way instead of distributing your keytab over to all the YARN
nodes.

Hope this helps.

--
Rong

[1] https://wiki.apache.org/hadoop/Hbase/HBaseTokenAuthentication

On Mon, Sep 24, 2018 at 7:51 PM sanmutongzi  wrote:

> Hi Aljoscha,
> Sorry for my late response . According to my experience , if the
> flink-conf.yaml has set the "security.kerberos.login.keytab" and
> "security.kerberos.login.contexts" with a kerberos file then yarn will
> ship the keytab file to the TaskManager .
> Also i can find the log like:
>  " INFO  org.apache.flink.configuration.GlobalConfiguration-
> Loading configuration property: security.kerberos.login.keytab,
>
> /data1/yarn/nm/usercache/hadoop/appcache/application_1537515506704_0007/container_e28_1537515506704_0007_01_01/krb5.keytab"
> in the TaskManager log.
> My problem is that in the user code like map or sink function how can i get
> the security.kerberos.login.keytab value  for login .
>
> THANKS
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: When should the RETAIN_ON_CANCELLATION option be used?

2018-09-24 Thread 徐涛
Hi All,
I mean if I can guarantee that a savepoint can always be made before 
manually cancelation. If I use DELETE_ON_CANCELLATION option on checkpoints, is 
there any probability that I do not have a checkpoint to recover from?
Thank a a lot.

Best
Henry



> 在 2018年9月25日,上午10:41,徐涛  写道:
> 
> Hi All,
>   In flink document, it says
>   DELETE_ON_CANCELLATION: “Delete the checkpoint when the job is 
> cancelled. The checkpoint state will only be available if the job fails.”
>   What is the definition and difference between job cancel and job fails? 
> If I run the program on yarn, and after a few days, the yarn application get 
> failed for some reason.
>   If I use DELETE_ON_CANCELLATION option, in this case, does I have the 
> checkpoint to resume the program?
> 
>   If the checkpoint are only deleted when I cancel the program, I can 
> always make the savepoint before cancelation. Then it seems that I can only 
> set DELETE_ON_CANCELLATION then.
>   I can not find a case that RETAIN_ON_CANCELLATION should be used.
>   
> 
> Best
> Henry
> 



Re: 1.5 Checkpoint metadata location

2018-09-24 Thread vino yang
Hi Bryant,

Maybe Stefan can answer your question, ping him for you.

Thanks, vino.

Bryant Baltes  于2018年9月25日周二 上午12:29写道:

> Hi All,
>
> After upgrading from 1.3.2 to 1.5.2, one of our apps that uses
> checkpointing no longer writes metadata files to the state.checkpoints.dir
> location provided to the flink conf.  I see this email chain addressed this
> here:
> https://lists.apache.org/thread.html/922f77880eca2a7b279e153090da2388b54f19e89528a2a35937d9a8@%3Cuser.flink.apache.org%3E
> .
>
> I am still a bit unclear what the workaround is.  We use the metadata
> files to recover when the app goes down or gets restarted.
>
> Thanks,
>
> Bryant
>


Re: How to get the location of keytab when using flink on yarn

2018-09-24 Thread sanmutongzi
Hi Aljoscha,
Sorry for my late response . According to my experience , if the
flink-conf.yaml has set the "security.kerberos.login.keytab" and
"security.kerberos.login.contexts" with a kerberos file then yarn will
ship the keytab file to the TaskManager .
Also i can find the log like:
 " INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: security.kerberos.login.keytab,
/data1/yarn/nm/usercache/hadoop/appcache/application_1537515506704_0007/container_e28_1537515506704_0007_01_01/krb5.keytab"
in the TaskManager log.
My problem is that in the user code like map or sink function how can i get
the security.kerberos.login.keytab value  for login .

THANKS




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


When should the RETAIN_ON_CANCELLATION option be used?

2018-09-24 Thread 徐涛
Hi All,
In flink document, it says
DELETE_ON_CANCELLATION: “Delete the checkpoint when the job is 
cancelled. The checkpoint state will only be available if the job fails.”
What is the definition and difference between job cancel and job fails? 
If I run the program on yarn, and after a few days, the yarn application get 
failed for some reason.
If I use DELETE_ON_CANCELLATION option, in this case, does I have the 
checkpoint to resume the program?

If the checkpoint are only deleted when I cancel the program, I can 
always make the savepoint before cancelation. Then it seems that I can only set 
DELETE_ON_CANCELLATION then.
I can not find a case that RETAIN_ON_CANCELLATION should be used.


Best
Henry



Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-24 Thread Julio Biason
Hey guys, Stefan,

Yeah, sorry about the stacks. Completely forgot about them.

But I think we figured out why it's taking so long (and yeah, Stefan was
right from the start): This specific slot is receiving 5x more records than
any other slot (on a recent run, it had 10x more records than the second
largest slot) -- and I feel like a complete twit for not realizing this
earlier. We'll try to find out why that slot is getting so many records.

For now, we switched from FsStateBackend to RocksDBBackend with incremental
checkpointing and things seem to, somewhat, balanced out, with no more
failures due timeout.

If this kind of stuff happens again, I'll send the logs -- with stacks this
time. :p

On Fri, Sep 21, 2018 at 5:19 AM, Stefan Richter  wrote:

> Hi,
>
> any chance that you could also provide some thread dumps from
> blocking/slow machines as mentioned in my previous mail. I think they might
> help more than just logs. Would also be nice if you could create a single
> zip file with all the things instead of a bunch of > 50MB logs.
>
> Best,
> Stefan
>
> Am 20.09.2018 um 21:57 schrieb Julio Biason :
>
> Hey guys,
>
> Ok, I started the whole pipeline again with DEBUG enable for everything. I
> captured the logs for all taskmanagers (2 machines with 4 taskmanagers, 1
> machine with 2) and the jobmanager: http://juliobiason.net/files/logs/
>
> I remove the logs from 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.http.wire
> 'cause it seemed a bit too redundant and our own logs, 'cause it would look
> like noise.
>
> In this session, the operator "Group metrics" (our 
> keyBy(...).window(TumblingEventTimeWindows))
> failed in the first checkpoint, taking longer than 1 hour (our timeout) to
> complete checkpoints on slots 8 and 27; all other slots completed in less
> than 5 minutes.
>
> On Thu, Sep 20, 2018 at 9:21 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> A debug log for state backend and checkpoint coordinator could also help.
>>
>>
>> Am 20.09.2018 um 14:19 schrieb Stefan Richter <
>> s.rich...@data-artisans.com>:
>>
>> Hi,
>>
>> if some tasks take like 50 minutes, could you wait until such a
>> checkpoint is in progress and (let’s say after 10 minutes) log into the
>> node and create a (or multiple over time) thread-dump(s) for the JVM that
>> runs the slow checkpointing task. This could help to figure out where it is
>> stuck or waiting.
>>
>> Best,
>> Stefan
>>
>> Am 19.09.2018 um 22:30 schrieb Julio Biason :
>>
>> Hey guys,
>>
>> So, switching to Ceph/S3 didn't shine any new lights on the issue.
>> Although the times are a bit higher, just a few slots are taking a
>> magnitude longer to save. So I changed the logs for DEBUG.
>>
>> The problem is: I'm not seeing anything that seems relevant; only pings
>> from ZooKeeper, heartbeats and the S3 disconnecting from being idle.
>>
>> Is there anything else that I should change to DEBUG? Akka? Kafka? Haoop?
>> ZooKeeper? (Those are, by the default config, bumped to INFO)
>>
>> All of those?
>>
>> On Tue, Sep 18, 2018 at 12:34 PM, Julio Biason 
>> wrote:
>>
>>> Hey TIll (and others),
>>>
>>> We don't have debug logs yet, but we decided to remove a related
>>> component: HDFS.
>>>
>>> We are moving the storage to our Ceph install (using S3), which is
>>> running for longer than our HDFS install and we know, for sure, it runs
>>> without any problems (specially 'cause we have more people that understand
>>> Ceph than people that know HDFS at this point).
>>>
>>> If, for some reason, the problem persists, we know it's not the
>>> underlying storage and may be something with our pipeline itself. I'll
>>> enable debug logs, then.
>>>
>>> On Tue, Sep 18, 2018 at 4:20 AM, Till Rohrmann 
>>> wrote:
>>>
 This behavior seems very odd Julio. Could you indeed share the debug
 logs of all Flink processes in order to see why things are taking so long?

 The checkpoint size of task #8 is twice as big as the second biggest
 checkpoint. But this should not cause an increase in checkpoint time of a
 factor of 8.

 Cheers,
 Till

 On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu 
 wrote:

> Hi, Julio:
> This happens frequently? What state backend do you use? The async
> checkpoint duration and sync checkpoint duration seems normal compared to
> others, it seems that most of the time are spent acking the checkpoint.
>
> On Sun, Sep 16, 2018 at 9:24 AM vino yang 
> wrote:
>
>> Hi Julio,
>>
>> Yes, it seems that fifty-five minutes is really long.
>> However, it is linear with the time and size of the previous task
>> adjacent to it in the diagram.
>> I think your real application is concerned about why Flink accesses
>> HDFS so slowly.
>> You can call the DEBUG log to see if you can find any clues, or post
>> the log to the mailing list to help others analyze the problem for you.
>>
>> Thanks, vino.
>>
>> Julio Biason  

Re: Flink TaskManagers do not start until job is submitted in YARN

2018-09-24 Thread suraj7
Thanks for the clarification, Dawid and Till.

@Till We have a few streaming jobs that need to be running all the time and
we plan on using the modify tool to update parallelism of jobs as we scale
the cluster in and out and knowing total slots value is crucial to this
workflow.

As Dawid pointed out, is there a switch to restore the old behavior?
If not, is there a way to find/predict total slots value from YARN metrics?
Are you aware of any such workflow?

Thanks,
Suraj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


1.5 Checkpoint metadata location

2018-09-24 Thread Bryant Baltes
Hi All,

After upgrading from 1.3.2 to 1.5.2, one of our apps that uses
checkpointing no longer writes metadata files to the state.checkpoints.dir
location provided to the flink conf.  I see this email chain addressed this
here:
https://lists.apache.org/thread.html/922f77880eca2a7b279e153090da2388b54f19e89528a2a35937d9a8@%3Cuser.flink.apache.org%3E
.

I am still a bit unclear what the workaround is.  We use the metadata files
to recover when the app goes down or gets restarted.

Thanks,

Bryant


Re: Flink TaskManagers do not start until job is submitted in YARN

2018-09-24 Thread Till Rohrmann
Hi Suraj,

at the moment Flink's new mode does not support such a behaviour. There are
plans to set a min number of running TaskManagers which won't be released.
But no work has been done in this direction yet, afaik. If you want, then
you can help the community with this effort.

Cheers,
Till

On Mon, Sep 24, 2018 at 3:07 PM Dawid Wysakowicz 
wrote:

> Hi Suraj,
>
> As far as I know this was changed with FLIP-6 to allow dynamic resource
> allocation.
>
> Till, cced might know if there is a switch to restore old behavior or
> are there plans to support it.
>
> Best,
>
> Dawid
>
> On 24/09/18 12:24, suraj7 wrote:
> > Hi,
> >
> > I am using Amazon EMR to run Flink Cluster on YARN. My setup consists of
> > m4.large instances for 1 master and 2 core nodes. I have started the
> Flink
> > Cluster on YARN with the command: flink-yarn-session -n 2 -d -tm 4096 -s
> 4.
> >
> > Flink Job Manager and Application Manager starts but there are no Task
> > Managers running. The Flink Web interface shows 0 for task managers, task
> > slots and slots available. However when I submit a job to flink cluster,
> > then Task Managers get allocated and the job runs and the Web UI shows
> > correct values as expected and goes back to 0 once the job is complete.
> >
> > I would like Task Managers to be running even when no Job is submitted
> as I
> > want to use Flink's REST API to monitor and modify parallelism based on
> the
> > available slots value while scaling Core Nodes.
> >
> > Is there a configuration that I've missed which prevents Task Managers
> from
> > running all the time?
> >
> > Thanks,
> > Suraj
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Between Checkpoints in Kafka 11

2018-09-24 Thread Piotr Nowojski
Hi,

I have nothing more to add. You (Dawid) and Vino explained it correctly :)

Piotrek

> On 24 Sep 2018, at 15:16, Dawid Wysakowicz  wrote:
> 
> Hi Harshvardhan,
> 
> Flink won't buffer all the events between checkpoints. Flink uses Kafka's 
> transaction, which are committed only on checkpoints, so the data will be 
> persisted on the Kafka's side, but only available to read once committed.
> I've cced Piotr, who implemented the Kafka 0.11 connector in case he wants to 
> correct me or add something to the answer.
> 
> Best,
> 
> Dawid
> 
> On 23/09/18 17:48, Harshvardhan Agrawal wrote:
>> Hi,
>> 
>> Can someone please help me understand how does the exactly once semantic 
>> work with Kafka 11 in Flink?
>> 
>> Thanks,
>> Harsh
>> 
>> On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal 
>> mailto:harshvardhan.ag...@gmail.com>> wrote:
>> Hi,
>> 
>> I was going through the blog post on how TwoPhaseCommitSink function works 
>> with Kafka 11. One of the things I don’t understand is: What is the behavior 
>> of the Kafka 11 Producer between two checkpoints? Say that the time interval 
>> between two checkpoints is set to 15 minutes. Will Flink buffer all records 
>> in memory in that case and start writing to Kafka when the next checkpoint 
>> starts?
>> 
>> Thanks!
>> -- 
>> Regards,
>> Harshvardhan
>> 
>> 
>> -- 
>> Regards,
>> Harshvardhan Agrawal
>> 267.991.6618 | LinkedIn 



Re: Between Checkpoints in Kafka 11

2018-09-24 Thread Dawid Wysakowicz
Hi Harshvardhan,

Flink won't buffer all the events between checkpoints. Flink uses
Kafka's transaction, which are committed only on checkpoints, so the
data will be persisted on the Kafka's side, but only available to read
once committed.

I've cced Piotr, who implemented the Kafka 0.11 connector in case he
wants to correct me or add something to the answer.

Best,

Dawid


On 23/09/18 17:48, Harshvardhan Agrawal wrote:
> Hi,
>
> Can someone please help me understand how does the exactly once
> semantic work with Kafka 11 in Flink?
>
> Thanks,
> Harsh
>
> On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal
> mailto:harshvardhan.ag...@gmail.com>>
> wrote:
>
> Hi,
>
> I was going through the blog post on how TwoPhaseCommitSink
> function works with Kafka 11. One of the things I don’t understand
> is: What is the behavior of the Kafka 11 Producer between two
> checkpoints? Say that the time interval between two checkpoints is
> set to 15 minutes. Will Flink buffer all records in memory in that
> case and start writing to Kafka when the next checkpoint starts?
>
> Thanks!
> -- 
> Regards,
> Harshvardhan
>
>
>
> -- 
> *Regards,
> Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn *



signature.asc
Description: OpenPGP digital signature


Re: Flink TaskManagers do not start until job is submitted in YARN

2018-09-24 Thread Dawid Wysakowicz
Hi Suraj,

As far as I know this was changed with FLIP-6 to allow dynamic resource
allocation.

Till, cced might know if there is a switch to restore old behavior or
are there plans to support it.

Best,

Dawid

On 24/09/18 12:24, suraj7 wrote:
> Hi,
>
> I am using Amazon EMR to run Flink Cluster on YARN. My setup consists of
> m4.large instances for 1 master and 2 core nodes. I have started the Flink
> Cluster on YARN with the command: flink-yarn-session -n 2 -d -tm 4096 -s 4.
>
> Flink Job Manager and Application Manager starts but there are no Task
> Managers running. The Flink Web interface shows 0 for task managers, task
> slots and slots available. However when I submit a job to flink cluster,
> then Task Managers get allocated and the job runs and the Web UI shows
> correct values as expected and goes back to 0 once the job is complete.
>
> I would like Task Managers to be running even when no Job is submitted as I
> want to use Flink's REST API to monitor and modify parallelism based on the
> available slots value while scaling Core Nodes. 
>
> Is there a configuration that I've missed which prevents Task Managers from
> running all the time?
>
> Thanks,
> Suraj
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




signature.asc
Description: OpenPGP digital signature


Re: error with session window

2018-09-24 Thread Dawid Wysakowicz
Hi Yuvraj,

It looks as some race condition for me. Would it be ok for you to switch
to either Event or Ingestion time[1]?

I also cced @Aljosha who might give you a bit more insights

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html#event-time--processing-time--ingestion-time

On 24/09/18 13:26, yuvraj singh wrote:
> this is my code 
>
>
> DataStream cityWithGeoHashesDataStream =
> filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
> ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
> .process(new ProcessWindowFunction CityWithGeoHashes, String, TimeWindow>() {
> @Override public void process(String city, Context 
> context, Iterable iterable, Collector 
> collector)
> throws Exception {
> Map> 
> geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false)
> 
> .collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
> collector.collect(new CityWithGeoHashes(city, 
> geoHashesPerCategory));
> }
> }).name("city-geohashes-processor")
> .uid("city-geohashes-processor");
>
> On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh
> <19yuvrajsing...@gmail.com > wrote:
>
>
> Hi all , 
>
>
> I  am stuck with this error 
>
>
> please help me .
>
>
> I am using sessionwindow 
>
>
> 2018-09-23 07:15:08,097 INFO 
> org.apache.flink.runtime.taskmanager.Task                     -
> city-geohashes-processor (24/48)
> (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: The end timestamp of a
> processing-time window cannot become earlier than the current
> processing time by merging. Current processing time: 1537667108063
> window: TimeWindow{start=1537667100054, end=1537667107237}
>
>         at
> 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>
>         at
> 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>
>         at
> 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>
>         at
> 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>
>         at
> 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
>
>
>
> Thanks 
>
> Yuvraj Singh 
>



signature.asc
Description: OpenPGP digital signature


Re: JMX Configuration: Missing Job Related Beans

2018-09-24 Thread Sayat Satybaldiyev
yep, they're there. thank you!

On Mon, Sep 24, 2018 at 12:54 PM 杨力  wrote:

> They are provided in taskmanagers.
>
> Sayat Satybaldiyev  于 2018年9月24日周一 下午6:38写道:
>
>> Dear all,
>>
>> While configuring JMX with Flink, I don't see some bean metrics that
>> belongs to the job, in particular, the number in/out records per operator.
>> I've checked REST API and those numbers provided there. Does flink provide
>> such bean or there's an additional configuration for it?
>>
>> Here's a list of bean that I see in visual vm:
>> jobmanager.Status.JVM.*
>> jobmanager.job.downtime
>> jobmanager.job.lastCheckpoint*
>> jobmanager.job.RestaringTime
>> jobmanager.job.uptime
>>
>> and a bunch of JVM related one. I've attached a print screen from
>> VisualVM to the email.
>>
>> Configuration for JMX in flink/conf.yaml:
>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port: 9020-9040
>>
>


Re: error with session window

2018-09-24 Thread yuvraj singh
this is my code


DataStream cityWithGeoHashesDataStream =
filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
.process(new ProcessWindowFunction() {
@Override
public void process(String city, Context context,
Iterable iterable, Collector
collector)
throws Exception {
Map>
geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(),
false)

.collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
collector.collect(new CityWithGeoHashes(city,
geoHashesPerCategory));
}
}).name("city-geohashes-processor")
.uid("city-geohashes-processor");


On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

>
> Hi all ,
>
>
> I  am stuck with this error
>
>
> please help me .
>
>
> I am using sessionwindow
>
>
> 2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task
>   - city-geohashes-processor (24/48)
> (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: The end timestamp of a
> processing-time window cannot become earlier than the current processing
> time by merging. Current processing time: 1537667108063 window:
> TimeWindow{start=1537667100054, end=1537667107237}
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
>
>
>
> Thanks
>
> Yuvraj Singh
>


error with session window

2018-09-24 Thread yuvraj singh
Hi all ,


I  am stuck with this error


please help me .


I am using sessionwindow


2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task
- city-geohashes-processor (24/48)
(26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: The end timestamp of a
processing-time window cannot become earlier than the current processing
time by merging. Current processing time: 1537667108063 window:
TimeWindow{start=1537667100054, end=1537667107237}

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)

at
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)

at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)




Thanks

Yuvraj Singh


Re: JMX Configuration: Missing Job Related Beans

2018-09-24 Thread 杨力
They are provided in taskmanagers.

Sayat Satybaldiyev  于 2018年9月24日周一 下午6:38写道:

> Dear all,
>
> While configuring JMX with Flink, I don't see some bean metrics that
> belongs to the job, in particular, the number in/out records per operator.
> I've checked REST API and those numbers provided there. Does flink provide
> such bean or there's an additional configuration for it?
>
> Here's a list of bean that I see in visual vm:
> jobmanager.Status.JVM.*
> jobmanager.job.downtime
> jobmanager.job.lastCheckpoint*
> jobmanager.job.RestaringTime
> jobmanager.job.uptime
>
> and a bunch of JVM related one. I've attached a print screen from VisualVM
> to the email.
>
> Configuration for JMX in flink/conf.yaml:
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port: 9020-9040
>


JMX Configuration: Missing Job Related Beans

2018-09-24 Thread Sayat Satybaldiyev
Dear all,

While configuring JMX with Flink, I don't see some bean metrics that
belongs to the job, in particular, the number in/out records per operator.
I've checked REST API and those numbers provided there. Does flink provide
such bean or there's an additional configuration for it?

Here's a list of bean that I see in visual vm:
jobmanager.Status.JVM.*
jobmanager.job.downtime
jobmanager.job.lastCheckpoint*
jobmanager.job.RestaringTime
jobmanager.job.uptime

and a bunch of JVM related one. I've attached a print screen from VisualVM
to the email.

Configuration for JMX in flink/conf.yaml:
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 9020-9040


Information required regarding SSL algorithms for Flink 1.5.x

2018-09-24 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

We have a query regarding SSL algorithms available for Flink versions. From the 
documents of Flink 1.6.0 we could see following SSL algorithms options are 
supported.

security.ssl.algorithms: 
TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Please let us know if all these options are supported in Flink 1.5.x releases 
as well.

Thanks,
Suchithra




Flink TaskManagers do not start until job is submitted in YARN

2018-09-24 Thread suraj7
Hi,

I am using Amazon EMR to run Flink Cluster on YARN. My setup consists of
m4.large instances for 1 master and 2 core nodes. I have started the Flink
Cluster on YARN with the command: flink-yarn-session -n 2 -d -tm 4096 -s 4.

Flink Job Manager and Application Manager starts but there are no Task
Managers running. The Flink Web interface shows 0 for task managers, task
slots and slots available. However when I submit a job to flink cluster,
then Task Managers get allocated and the job runs and the Web UI shows
correct values as expected and goes back to 0 once the job is complete.

I would like Task Managers to be running even when no Job is submitted as I
want to use Flink's REST API to monitor and modify parallelism based on the
available slots value while scaling Core Nodes. 

Is there a configuration that I've missed which prevents Task Managers from
running all the time?

Thanks,
Suraj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Weekly community update #39

2018-09-24 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #39. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.6.1 and Flink 1.5.4 released

The community has released new bug fix releases: Flink 1.6.1 and Flink
1.5.4 [1, 2].

# Open source review process contd

The Flink community continues to discuss how to improve PR reviews and to
help our contributors. The discussion is now spread across three threads
[3, 4, 5] for those of you interested in following.

# Continuous Flink benchmarks

Piotr and Nico are running already for quite some time continuous
benchmarks for Flink to see how performance is affected by new commits [6].
The web-ui of the benchmark suite can be accessed here [7].

[1] https://flink.apache.org/news/2018/09/20/release-1.6.1.html
[2] https://flink.apache.org/news/2018/09/20/release-1.5.4.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Contributing-1-Pull-Request-Template-tp24266.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Contributing-2-Review-Steps-tp24268.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Contributing-3-Review-Tooling-tp24269.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Codespeed-deployment-for-Flink-tp24274.html
[7] http://codespeed.dak8s.net:8000/

Cheers,
Till


Re: Running Flink in Google Cloud Platform (GCP) - can Flink be truly elastic?

2018-09-24 Thread Till Rohrmann
Hi Alexander,

the issue for the reactive mode, the mode which reacts to newly available
resources and scales the up accordingly, is here:
https://issues.apache.org/jira/browse/FLINK-10407. It does not contain a
lot of details but we are actively working on publishing the corresponding
design document soon. See also
https://issues.apache.org/jira/browse/FLINK-10404 which is related to the
reactive mode.

Cheers,
Till

On Sun, Sep 23, 2018 at 5:33 PM Konstantin Knauf <
konstan...@data-artisans.com> wrote:

> Hi Alexander,
>
> broadly speaking, what you are doing right now, is in line with what is
> currently possible with Apache Flink. Can you share a little bit more
> information about your setup (K8s/Flink-Standalone?
> Job-Mode/Session-Mode?)? You might find Gary's Flink Forward [1] talk
> interesting. He demonstrates how a Flink job automatically scales out, when
> it is given more resources by the resource manager, e.g. Kubernetes. But
> this is still work-in-progress.
>
> Best,
>
> Konstantin
>
> [1]
> https://data-artisans.com/flink-forward-berlin/resources/flink-as-a-library-and-still-as-a-framework
>
>
> On Fri, Sep 21, 2018 at 5:42 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Alexander,
>>
>> I've redirected your question to user mailing list. The goal of
>> community list is for "Broader community discussions related to meetups,
>> conferences, blog posts and job offers"
>>
>> Quick answer to your question is that dynamic scaling of flink job's is
>> a work in progress. Maybe Gary or Till cc'ed can share some more details
>> on that topic.
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 21/09/18 17:25, alexander.gard...@rbs.com.INVALID wrote:
>> > Hi
>> >
>> > I'm trying to understand what it means to run a Flink cluster inside
>> the Google Cloud Platform and whether it can act in an "elastic" way; if
>> the cluster needs more resources to accommodate a sudden demand or increase
>> in Flink jobs, will GCP automatically detect this and spool up more Task
>> Managers to provide extra task slots?
>> >
>> > If we consider the following two simple use cases, how would GCP
>> address them?
>> >
>> >
>> > 1) No free task slots to run new flink jobs
>> >
>> > 2) A slow flink job needs an increased parallelism to improve
>> throughput
>> >
>> > Currently, we'd handle the above use cases by:
>> >
>> >
>> > 1) knowing that the job failed due to "no free slots", check the
>> exception text, schedule to add a new task manager and rerun the job,
>> knowing that there are now available task slots.
>> >
>> > 2) We'd monitor the speed of the job ourselves, stop the job,
>> specify which components (operators) in the stream reqd an increase in
>> parallelism (for example via job properties), then relaunch the job; if not
>> enough slots were available, we'd have to consider adding extra task
>> managers.
>> >
>> >
>> > So my question is...can Google Cloud Platform (GCP) automatically
>> launch extra TMs to handle the above?
>> >
>> > If we proposed to run a Flink cluster in a GCP container, can GCP make
>> Flink behave dynamically elastic in the same way that Google DataFlow
>> apparently can?
>> >
>> > Regards
>> >
>> >
>> > Alex
>> >
>> >
>> > The Royal Bank of Scotland plc. Registered in Scotland No 83026.
>> Registered Office: 36 St Andrew Square, Edinburgh EH2 2YB. The Royal Bank
>> of Scotland is authorised by the Prudential Regulation Authority, and
>> regulated by the Financial Conduct Authority and Prudential Regulation
>> Authority. The Royal Bank of Scotland N.V. is authorised and regulated by
>> the De Nederlandsche Bank and has its seat at Amsterdam, the Netherlands,
>> and is registered in the Commercial Register under number 33002587.
>> Registered Office: Gustav Mahlerlaan 350, Amsterdam, The Netherlands. The
>> Royal Bank of Scotland N.V. and The Royal Bank of Scotland plc are
>> authorised to act as agent for each other in certain jurisdictions.
>> >
>> > National Westminster Bank Plc.  Registered in England No. 929027.
>> Registered Office: 135 Bishopsgate, London EC2M 3UR.  National Westminster
>> Bank Plc is authorised by the Prudential Regulation Authority, and
>> regulated by the Financial Conduct Authority and the Prudential Regulation
>> Authority.
>> >
>> > The Royal Bank of Scotland plc and National Westminster Bank Plc are
>> authorised to act as agent for each other.
>> >
>> > This e-mail message is confidential and for use by the addressee only.
>> If the message is received by anyone other than the addressee, please
>> return the message to the sender by replying to it and then delete the
>> message from your computer.  Internet e-mails are not necessarily secure.
>> The Royal Bank of Scotland plc, The Royal Bank of Scotland N.V., National
>> Westminster Bank Plc or any affiliated entity (RBS or us) does not accept
>> responsibility for changes made to this message after it was sent.  RBS may
>> monitor e-mails for business and operational purposes.  By replying to this
>> message you 

Re: [External] Re: Setting a custom Kryo serializer in Flink-Python

2018-09-24 Thread Chesnay Schepler

I can't really help you here.

Digging into the backing java internals isn't supported, and neither is 
registering a kryo serializer (which is why it isn't exposed in the 
python environment).
The jython-related serialization logic doesn't care about Flink's usual 
type serialization mechanism, so using avro will simply not work. It 
entirely assumes that all data is either created on the python or can be 
mapped automatically to a python type by jython.


On 18.09.2018 20:05, Joe Malt wrote:
Bumping this (I hope that's OK!) - I've been trying to fix this for a 
week and got nowhere


On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:


Hi Joe,

Probably Chesnay (cc’ed) may have a better idea on why this is
happening.

Cheers,
Kostas


On Sep 14, 2018, at 7:30 PM, Joe Malt mailto:jm...@yelp.com>> wrote:

Hi,

I'm trying to write a Flink job (with the Python streaming API)
that handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used
addDefaultKryoSerializer, similar to the instructions in

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html



In Python, the PythonStreamExecutionEnvironmentdoesn't have this
method, but it does have an ordinary StreamExecutionEnvironmentas
a private field ("env"). I'm using reflection to get this
StreamExecutionEnvironment and calling addDefaultKryoSerializeron
it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message,
MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException
(full stack trace at https://pastebin.com/zxxzCqH0
), it doesn't appear that
addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in
Python?

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp









Re: Flink not running properly.

2018-09-24 Thread Sarabjyotsingh Multani
Thanks, I'll check it out.

On Mon, Sep 24, 2018 at 9:49 AM vino yang  wrote:

> Hi,
>
> According to the instructions in the script:
>
> # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will 
> be used
> TM_MAX_OFFHEAP_SIZE="8388607T"
>
>
>  I think you may need to confirm if your operating system and the JDK you
> installed on the TM are 64-bit.
>
> Thanks, vino.
>
> Sarabjyotsingh Multani  于2018年9月23日周日 下午9:59写道:
>
>> Hello Admin,
>>  When I run "tail -f log/flink-*-taskexecutor-*.out" in command
>> line , I get the following error : "Invalid maximum direct memory size:
>> -XX:MaxDirectMemorySize=8388607T
>> The specified size exceeds the maximum representable size.
>> Error: Could not create the Java Virtual Machine.
>> Error: A fatal exception has occurred. Program will exit."
>> Please help.
>>
>>
>>


Re: LocalEnvironment and Python streaming

2018-09-24 Thread Chesnay Schepler
No, this isn't really possible. You need a java process to kick off the 
processing.


The only thing i can come up with is to open the flink-streaming-python 
module in the IDE and manually call the PythonStreamBinder class with 
the same arguments that you pass in the CLI as a test.


On 17.09.2018 04:35, vino yang wrote:

Hi Joe,

Maybe Chesnay is better suited to answer this question, Ping him for you.

Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年9月15日周六 
上午1:51写道:


Hi,

Is there any way to execute a job using the LocalEnvironment when
using the Python streaming API? This would make it much easier to
debug jobs.

At the moment I'm not aware of any way of running them except
firing up a local cluster and submitting the job with
pyflink-stream.sh.

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp





Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-24 Thread PedroMrChaves
Hello Stefan, 

Thank you for the help.

I've actually lost those logs to due several cluster restarts that we did,
which cause log rotation up (limit = 5 versions).
Those log lines that i've posted were the only ones that showed signs of
some problem. 

*The configuration of the job is as follows:*

/ private static final int DEFAULT_MAX_PARALLELISM = 16;
private static final int CHECKPOINTING_INTERVAL = 1000;
private static final int MIN_PAUSE_BETWEEN_CHECKPOINTS = 1000;
private static final int CHECKPOINT_TIMEOUT = 6;
private static final int INTERVAL_BETWEEN_RESTARTS = 120; 
(...)

  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.setMaxParallelism(DEFAULT_MAX_PARALLELISM);
environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
CheckpointingMode.EXACTLY_ONCE);
   
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(MIN_PAUSE_BETWEEN_CHECKPOINTS);
   
environment.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT);
environment.setRestartStrategy(RestartStrategies.noRestart());
environment.setParallelism(parameters.getInt(JOB_PARALLELISM));/
*
the kafka consumer/producer configuration is:*
/
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("max.request.size","1579193");
properties.put("processing.guarantee","exactly_once");
properties.put("isolation.level","read_committed");/



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behaviour with checkpointing and custom FilePathFilter

2018-09-24 Thread Kostas Kloudas
Hi Averell,

Happy to hear that the problem is no longer there and if you have more news 
from your 
debugging, let us know.

The thing that I wanted to mention is that from what you are describing, the 
problem does 
not seem to be related to checkpointing, but to the fact that applying your 
filter on the 100’s 
of thousands of small files takes time.

This may help with your debugging.

Cheers,
Kostas

> On Sep 24, 2018, at 2:10 AM, Averell  wrote:
> 
> Hi Vino, and all,
> 
> I tried to avoid the step to get File Status, and found that the problem is
> not there any more. I guess doing that with every single file out of 100K+
> files on S3 caused some issue with checkpointing.
> Still trying to find the cause, but with lower priority now.
> 
> Thanks for your help.
> 
> Regards,
> Averell   
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: error closing kafka

2018-09-24 Thread yuvraj singh
i have one more question ,
is it possible , if i do keyby on the stream it will get portioned
automatically ,

because i am getting all the data in the same partition in kafka.

Thanks
Yubraj Singh

On Mon, Sep 24, 2018 at 12:34 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> I am processing data and then sending it to kafka by kafka sink .
>
> this is method where I am producing the data
>
> nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
> .name("nudge-details-producer")
> .uid("nudge-details-producer");
>
>
>
>
> its my producer
>
> public class NudgeCarLevelProducer {
>
> static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
> public static FlinkKafkaProducer010 
> getProducer(PeakLocationFinderGlobalConfig config) {
> return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
> new NudgeCarLevelSchema(config),
> 
> FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
> }
> }
>
>
> class NudgeCarLevelSchema implements SerializationSchema
> {
> Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
> ObjectMapper mapper = new ObjectMapper();
> PeakLocationFinderGlobalConfig config;
>
> public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
> {
>this.config  = config;
> }
>
> @Override
> public byte[] serialize(NudgeDetails element) {
> byte [] bytes = null;
> Document document = new Document();
> document.setId(UUID.randomUUID().toString());
> Metadata metadata = new Metadata();
> metadata.setSchema(config.getFabricCarLevelDataStream());
> metadata.setSchemaVersion(1);
> metadata.setTenant(config.getTenantId());
> metadata.setTimestamp(System.currentTimeMillis());
> metadata.setType(Type.EVENT);
> metadata.setSender("nudge");
> metadata.setStream(config.getFabricCarLevelDataStream());
> document.setMetadata(metadata);
> document.setData(element);
> try {
> bytes =  mapper.writeValueAsString(document).getBytes();
> } catch (Exception e) {
> logger.error("error while serializing nudge car level Schema");
> }
> return bytes;
> }
> }
>
>
>
>
> On Mon, Sep 24, 2018 at 12:24 PM miki haiat  wrote:
>
>> What are you trying to do , can you share some code ?
>> This is the reason for the exeption
>> Proceeding to force close the producer since pending requests could not
>> be completed within timeout 9223372036854775807 ms.
>>
>>
>>
>> On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com>
>> wrote:
>>
>>> Hi all ,
>>>
>>>
>>> I am getting this error with flink 1.6.0 , please help me .
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2018-09-23 07:15:08,846 ERROR
>>> org.apache.kafka.clients.producer.KafkaProducer   -
>>> Interrupted while joining ioThread
>>>
>>> java.lang.InterruptedException
>>>
>>> at java.lang.Object.wait(Native Method)
>>>
>>> at java.lang.Thread.join(Thread.java:1257)
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>>
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2018-09-23 07:15:08,847 INFO  
>>> org.apache.kafka.clients.producer.KafkaProducer
>>>   - Proceeding to force close the producer since pending
>>> requests could not be completed within timeout 9223372036854775807 ms.
>>>
>>> 2018-09-23 07:15:08,860 ERROR
>>> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
>>> during disposal of stream operator.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>>
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>>

Re: error closing kafka

2018-09-24 Thread yuvraj singh
I am processing data and then sending it to kafka by kafka sink .

this is method where I am producing the data

nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
.name("nudge-details-producer")
.uid("nudge-details-producer");




its my producer

public class NudgeCarLevelProducer {

static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
public static FlinkKafkaProducer010
getProducer(PeakLocationFinderGlobalConfig config) {
return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
new NudgeCarLevelSchema(config),

FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
}
}


class NudgeCarLevelSchema implements SerializationSchema
{
Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
ObjectMapper mapper = new ObjectMapper();
PeakLocationFinderGlobalConfig config;

public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
{
   this.config  = config;
}

@Override
public byte[] serialize(NudgeDetails element) {
byte [] bytes = null;
Document document = new Document();
document.setId(UUID.randomUUID().toString());
Metadata metadata = new Metadata();
metadata.setSchema(config.getFabricCarLevelDataStream());
metadata.setSchemaVersion(1);
metadata.setTenant(config.getTenantId());
metadata.setTimestamp(System.currentTimeMillis());
metadata.setType(Type.EVENT);
metadata.setSender("nudge");
metadata.setStream(config.getFabricCarLevelDataStream());
document.setMetadata(metadata);
document.setData(element);
try {
bytes =  mapper.writeValueAsString(document).getBytes();
} catch (Exception e) {
logger.error("error while serializing nudge car level Schema");
}
return bytes;
}
}




On Mon, Sep 24, 2018 at 12:24 PM miki haiat  wrote:

> What are you trying to do , can you share some code ?
> This is the reason for the exeption
> Proceeding to force close the producer since pending requests could not be
> completed within timeout 9223372036854775807 ms.
>
>
>
> On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com> wrote:
>
>> Hi all ,
>>
>>
>> I am getting this error with flink 1.6.0 , please help me .
>>
>>
>>
>>
>>
>>
>> 2018-09-23 07:15:08,846 ERROR
>> org.apache.kafka.clients.producer.KafkaProducer   -
>> Interrupted while joining ioThread
>>
>> java.lang.InterruptedException
>>
>> at java.lang.Object.wait(Native Method)
>>
>> at java.lang.Thread.join(Thread.java:1257)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>   - Proceeding to force close the producer since pending
>> requests could not be completed within timeout 9223372036854775807 ms.
>>
>> 2018-09-23 07:15:08,860 ERROR
>> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
>> during disposal of stream operator.
>>
>> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>> at

Re: error closing kafka

2018-09-24 Thread miki haiat
What are you trying to do , can you share some code ?
This is the reason for the exeption
Proceeding to force close the producer since pending requests could not be
completed within timeout 9223372036854775807 ms.



On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com> wrote:

> Hi all ,
>
>
> I am getting this error with flink 1.6.0 , please help me .
>
>
>
>
>
>
> 2018-09-23 07:15:08,846 ERROR
> org.apache.kafka.clients.producer.KafkaProducer   -
> Interrupted while joining ioThread
>
> java.lang.InterruptedException
>
> at java.lang.Object.wait(Native Method)
>
> at java.lang.Thread.join(Thread.java:1257)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer
>   - Proceeding to force close the producer since pending
> requests could not be completed within timeout 9223372036854775807 ms.
>
> 2018-09-23 07:15:08,860 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator.
>
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.InterruptedException
>
> at java.lang.Object.wait(Native Method)
>
> at java.lang.Thread.join(Thread.java:1257)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>
> ... 9 more
>
>
> Thanks
>
> Yubraj Singh
>


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-24 Thread alex
We started to see same errors after upgrading to flink 1.6.0 from 1.4.2. We
have one JM and 5 TM on kubernetes. JM is running on HA mode. Taskmanagers
sometimes are loosing connection to JM and having following error like you
have.

*2018-09-19 12:36:40,687 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying in
1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of
type "akka.actor.Identify"..*

When TM started to have "Could not resolve ResourceManager", it cannot
resolve itself until I restart the TM pod.

*Here is the content of our flink-conf.yaml:*
blob.server.port: 6124
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 4096
jobmanager.web.history: 20
jobmanager.archive.fs.dir: s3://our_path
taskmanager.rpc.port: 6121
taskmanager.heap.mb: 16384
taskmanager.numberOfTaskSlots: 10
taskmanager.log.path: /opt/flink/log/output.log
web.log.path: /opt/flink/log/output.log
state.checkpoints.num-retained: 3
metrics.reporters: prom
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter

high-availability: zookeeper
high-availability.jobmanager.port: 50002
high-availability.zookeeper.quorum: zookeeper_instance_list
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: profileservice
high-availability.storageDir: s3://our_path

Any help will be greatly appreciated!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


error closing kafka

2018-09-24 Thread yuvraj singh
Hi all ,


I am getting this error with flink 1.6.0 , please help me .






2018-09-23 07:15:08,846 ERROR
org.apache.kafka.clients.producer.KafkaProducer   - Interrupted
while joining ioThread

java.lang.InterruptedException

at java.lang.Object.wait(Native Method)

at java.lang.Thread.join(Thread.java:1257)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)

2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer
  - Proceeding to force close the producer since pending
requests could not be completed within timeout 9223372036854775807 ms.

2018-09-23 07:15:08,860 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.

org.apache.kafka.common.KafkaException: Failed to close kafka producer

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.InterruptedException

at java.lang.Object.wait(Native Method)

at java.lang.Thread.join(Thread.java:1257)

at
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

... 9 more


Thanks

Yubraj Singh