Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-22 Thread Dawid Wysakowicz
Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output
tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested:
StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
> I think you could try
> StreamExecutionEnvironment.clean(pendingProjectsTag). 
>
>
> Oytun Tez mailto:oy...@motaword.com>>于2019年4月19日
> 周五下午9:58写道:
>
> Forgot to answer one of your points: the parent class compiles
> well without this CEP selector (with timeout signature)...
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com  — www.motaword.com
> 
>
>
> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez  > wrote:
>
> Hey JingsongLee!
>
> Here are some findings...
>
>   * flatSelect *without timeout* works normally:
> patternStream.flatSelect(PatternFlatSelectFunction), this
> compiles well.
>   * Converted the both timeout and select selectors to an
> *inner class* (not static), yielded the same results,
> doesn't compile.
>   * flatSelect *without* timeout, but with an inner class for
> PatternFlatSelectFunction, it compiles (same as first bullet).
>   * Tried both of these selectors with _empty_ body. Just a
> skeleton class. Doesn't compile either. Empty body example
> is in my first email.
>   * Tried making both selectors *static public inner* classes,
> doesn't compile either.
>   * Extracted both timeout and flat selectors to their own
> *independent classes* in separate files. Doesn't compile.
>   * I am putting the *error stack* below.
>   * Without the timeout selector in any class or lambda shape,
> with empty or full body, flatSelect compiles well.
>
> Would these findings help? Any ideas?
>
> Here is an error stack:
>
> 09:36:51,925 ERROR
> com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
> org.apache.flink.api.common.InvalidProgramException: The
> implementation of the PatternFlatSelectAdapter is not
> serializable. The object probably contains or references non
> serializable fields.
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
> at
> 
> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
> at
> org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
> at
> org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
> at
> org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
> at
> 
> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
> at
> 
> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
> at
> 
> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
> at com.motaword.ipm.kernel.Application.main(Application.java:63)
> Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> java.io.Obj

Re: Missing state in RocksDB checkpoints

2019-04-22 Thread Congxian Qiu
Hi, Ning

From the log message you given, the two operate share the same directory, and 
when snapshot, the directory will be deleted first if it 
exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).

I did not find an issue for this problem, and I don’t thinks this is a problem 
of UUID generation problem, please check the path generation logic in 
LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.

I’ve created an issue for this problem.

Best, Congxian
On Apr 23, 2019, 11:19 +0800, Ning Shi , wrote:
> We have a Flink job using RocksDB state backend. We found that one of the
> RichMapFunction state was not being saved in checkpoints or savepoints. After
> some digging, it seems that two operators in the same operator chain are
> colliding with each other during checkpoint or savepoint, resulting in one of
> the operator's state to be missing.
>
> I extracted all the checkpoint directory for all operators from the RocksDB 
> LOG
> files for one of the checkpoints. As you can see, the StreamMap operator 
> shared
> the same checkpoint directory with the CoBroadcastWithKeyedOperator. They are 
> in
> the same operator chain.
>
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_1/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__2_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__54_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__54_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_77/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__78_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_84/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__85_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__67_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__67_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_15/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__16_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_53/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__54_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__2_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__2_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_56/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__57_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_30/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__31_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__47_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__47_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_12/chk_21244/rocks_db
> 

Re: status on FLINK-7129

2019-04-22 Thread Dawid Wysakowicz
Hi Kant,

I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
development on that issue.

Best,

Dawid

On 22/04/2019 18:20, Konstantin Knauf wrote:
> Hi Kant,
>
> as far as I know, no one is currently working on this. Dawid (cc)
> maybe knows more.
>
> Cheers,
>
> Konstantin
>
> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  > wrote:
>
> Hi All,
>
> There seems to be a lot of interest
> for https://issues.apache.org/jira/browse/FLINK-7129
>
> Any rough idea on the status of this issue?
>
> Thanks!
>
>
>
> -- 
>
> Konstantin Knauf| Solutions Architect
>
> +49 160 91394525
>
> Planned Absences: 17.04.2019 - 26.04.2019
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


signature.asc
Description: OpenPGP digital signature


Missing state in RocksDB checkpoints

2019-04-22 Thread Ning Shi
We have a Flink job using RocksDB state backend. We found that one of the
RichMapFunction state was not being saved in checkpoints or savepoints. After
some digging, it seems that two operators in the same operator chain are
colliding with each other during checkpoint or savepoint, resulting in one of
the operator's state to be missing.

I extracted all the checkpoint directory for all operators from the RocksDB LOG
files for one of the checkpoints. As you can see, the StreamMap operator shared
the same checkpoint directory with the CoBroadcastWithKeyedOperator. They are in
the same operator chain.

/var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_1/chk_21244/rocks_db
   CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__2_90__
/var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__54_90__
/var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
  StreamMap_3c5866a6cc097b462de842b2ef91910d__54_90__
/var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_77/chk_21244/rocks_db
  WindowOperator_bc2936094388a70852534bd6c0fce178__78_90__
/var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_84/chk_21244/rocks_db
  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__85_90__
/var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__67_90__
/var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
  StreamMap_3c5866a6cc097b462de842b2ef91910d__67_90__
/var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_15/chk_21244/rocks_db
  WindowOperator_bc2936094388a70852534bd6c0fce178__16_90__
/var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_53/chk_21244/rocks_db
  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__54_90__
/var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
   CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__2_90__
/var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
   StreamMap_3c5866a6cc097b462de842b2ef91910d__2_90__
/var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_56/chk_21244/rocks_db
  WindowOperator_bc2936094388a70852534bd6c0fce178__57_90__
/var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_30/chk_21244/rocks_db
  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__31_90__
/var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__47_90__
/var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
  StreamMap_3c5866a6cc097b462de842b2ef91910d__47_90__
/var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_12/chk_21244/rocks_db
  WindowOperator_bc2936094388a70852534bd6c0fce178__13_90__
/var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_46/chk_21244/rocks_db
  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__47_90__
/var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b506}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_30/chk_21244/rocks_db
  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__31_90__
/var/flink/data/localState/aid_AllocationID{f6241daa33001250c3f2934a8ba6b5

Re: How to pass application name when using FlinkKinesisConsumer

2019-04-22 Thread Biao Liu
Hi Xixi,

I'm not sure I have understood correctly, do you mean you would like to set
the "kinesis consumer" name?
Does the API "StreamExecutionEnvironment.addSource($KinesisConsumer,
$sourceName)" satisfy this?

Xixi Li  于2019年4月23日周二 上午3:25写道:

>
> Hi
>
> I have a question about how we can set up a kinesis consumer with a
> specified applicationName, we are currently using flink-connector-kinesis
> version 1.5, but we cant find any where to set up the applicationName.
> Thank
> you very much!
>
> Regards,
> Xixi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Netty channel closed at AKKA gated status

2019-04-22 Thread Wenrui Meng
Thanks. We find the relevant nodemanager log and figured out the lost task
manager killed by the yarn due to memory limit. @zhijiang
 @Biao Liu  Thanks for your
help.

On Sun, Apr 21, 2019 at 11:45 PM zhijiang 
wrote:

> Hi Wenrui,
>
> I think you could trace the log of node manager which contains the
> lifecycle of this task executor. Maybe this task executor is killed by node
> manager because of memory overuse.
>
> Best,
> Zhijiang
>
> --
> From:Wenrui Meng 
> Send Time:2019年4月20日(星期六) 09:48
> To:zhijiang 
> Cc:Biao Liu ; user ; tzulitai <
> tzuli...@apache.org>
> Subject:Re: Netty channel closed at AKKA gated status
>
> Attached the lost task manager last 1 lines log. Anyone can help take
> a look?
>
> Thanks,
> Wenrui
>
> On Fri, Apr 19, 2019 at 6:32 PM Wenrui Meng  wrote:
> Looked at a few same instances. The lost task manager was indeed not
> active anymore since there is no log for that task manager printed after
> the connection issue timestamp. I guess somehow that task manager died
> silently without exception or termination relevant information logged. I
> double checked the lost task manager host, the GC, CPU, memory, network,
> disk I/O all look good without any spike. Is there any other possibility
> that the task manager can be terminated? We run our jobs in the yarn
> cluster.
>
> On Mon, Apr 15, 2019 at 10:47 PM zhijiang 
> wrote:
> Hi Wenrui,
>
> You might further check whether there exists network connection issue
> between job master and target task executor if you confirm the target task
> executor is still alive.
>
> Best,
> Zhijiang
> --
> From:Biao Liu 
> Send Time:2019年4月16日(星期二) 10:14
> To:Wenrui Meng 
> Cc:zhijiang ; user ;
> tzulitai 
> Subject:Re: Netty channel closed at AKKA gated status
>
> Hi Wenrui,
> If a task manager is killed (kill -9), it would have no chance to log
> anything. If the task manager exits since connection timeout, there would
> be something in log file. So it is probably killed by other user or
> operating system. Please check the log of operating system. BTW, I don't
> think "DEBUG log level" would help.
>
> Wenrui Meng  于2019年4月16日周二 上午9:16写道:
> There is no exception or any warning in the task manager
> `'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not
> shut down either in cluster monitor dashboard. It probably requires to turn
> on DEBUG log to get more useful information. If the task manager gets
> killed, I assume there will be terminating log in the task manager log. If
> not, I don't know how to figure out whether it's due to task manager gets
> killed or just a connection timeout.
>
>
>
> On Sun, Apr 14, 2019 at 7:22 PM zhijiang 
> wrote:
> Hi Wenrui,
>
> I think the akka gated issue and inactive netty channel are both caused by
> some task manager exits/killed. You should double check the status and
> reason of this task manager `'athena592-phx2/10.80.118.166:44177'`.
>
> Best,
> Zhijiang
> --
> From:Wenrui Meng 
> Send Time:2019年4月13日(星期六) 01:01
> To:user 
> Cc:tzulitai 
> Subject:Netty channel closed at AKKA gated status
>
> We encountered the netty channel inactive issue while the AKKA gated that
> task manager. I'm wondering whether the channel closed because of the AKKA
> gated status, since all message to the taskManager will be dropped at that
> moment, which might cause netty channel exception. If so, shall we have
> coordination between AKKA and Netty? The gated status is not intended to
> fail the system. Here is the stack trace fthe or exception
>
> 2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 3758 (3788228399 bytes in 5967 ms).
> 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-25 - Association with remote
> system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-25 - Association with remote
> system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96)
> (93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager 'athena592-phx2/
> 10.80.118.166:44177'. This might indicate that the remote task manager
> was lost.
> at
> org.apache.fl

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-22 Thread an0
Thanks, I feel I'm getting closer to the truth. 

So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 
tasks running after `keyBy` if even all elements have the same key so go to 1 
down stream(say task 1)? And it is the other task(task 2) with no incoming data 
that caused the `timeWindowAll` stream unable to progress? Because both task 1 
and task 2 are its input streams and one is idling so its event time cannot 
make progress?

On 2019/04/22 01:57:39, Guowei Ma  wrote: 
> HI,
> 
> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> receives an element.
> 
> For after Keyby:
> Flink uses the HashCode of key and the parallelism of down stream to decide
> which subtask would receive the element. This means if your key is always
> same, all the sources will only send the elements to the same down stream
> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> 
> For before Keyby:
> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> be chained together, which means every
> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> 
> Best,
> Guowei
> 
> 
> an0  于2019年4月19日周五 下午10:41写道:
> 
> > Hi,
> >
> > First of all, thank you for the `shuffle()` tip. It works. However, I
> > still don't understand why it doesn't work without calling `shuffle()`.
> >
> > Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> > All the trips has keys and timestamps. As I said in my reply to Paul, I see
> > the same watermarks being extracted.
> >
> > How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> > matter? My understanding is any specific window for a specific key always
> > receives the exactly same data, and the calling order of
> > `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >
> > To make `keyBy` as irrelevant as possible, I tried letting it always
> > return the same key so that there is only 1 keyed stream and it is exactly
> > the same as the original unkeyed stream. It still doesn't trigger windows:
> > ```java
> > DataStream trips = env.addSource(consumer);
> > KeyedStream userTrips = trips.keyBy(trip -> 0L);
> > DataStream featurizedUserTrips =
> > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > AllWindowedStream windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > It makes no sense to me. Please help me understand why it doesn't work.
> > Thanks!
> >
> > On 2019/04/19 04:14:31, Guowei Ma  wrote:
> > > Hi,
> > > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> > > could receive the elements(trip). If that is the case
> > > BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> > > would not send the WM. Since that the timeWindowAll operator could not be
> > > triggered.
> > > You could add a shuffle() before the assignTimestampsAndWatermarks in
> > your
> > > second case and check if the window is triggered.  If it could be
> > triggered
> > > you could check the distribution of elements generated by the source.
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > an0...@gmail.com  于2019年4月19日周五 上午4:10写道:
> > >
> > > > I don't think it is the watermark. I see the same watermarks from the
> > two
> > > > versions of code.
> > > >
> > > > The processing on the keyed stream doesn't change event time at all. I
> > can
> > > > simply change my code to use `map` on the keyed stream to return back
> > the
> > > > input data, so that the window operator receives the exactly same
> > data. The
> > > > only difference is when I do `assignTimestampsAndWatermarks`. The
> > result is
> > > > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > ```java
> > > > DataStream trips =
> > > > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > > > @Override
> > > > public long extractTimestamp(Trip trip) {
> > > > return trip.endTime.getTime();
> > > > }
> > > > });
> > > > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream featurizedUserTrips = userTrips.map(trip -> trip);
> > > > AllWindowedStream windowedUserTrips =
> > > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > Time.days(1));
> > > > ```
> > > >
> > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > > > ```java
> > > > DataStream trips = env.addSource(consumer);
> > > > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream featurizedUserTrips =
> > > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > > >

How to pass application name when using FlinkKinesisConsumer

2019-04-22 Thread Xixi Li


Hi

I have a question about how we can set up a kinesis consumer with a
specified applicationName, we are currently using flink-connector-kinesis
version 1.5, but we cant find any where to set up the applicationName. Thank
you very much!

Regards,
Xixi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: status on FLINK-7129

2019-04-22 Thread Konstantin Knauf
Hi Kant,

as far as I know, no one is currently working on this. Dawid (cc) maybe
knows more.

Cheers,

Konstantin

On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:

> Hi All,
>
> There seems to be a lot of interest for
> https://issues.apache.org/jira/browse/FLINK-7129
>
> Any rough idea on the status of this issue?
>
> Thanks!
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 17.04.2019 - 26.04.2019




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Constant backpressure on flink job

2019-04-22 Thread Monika Hristova
Hello,

We are experiencing regular backpressure (at least once a week). I would like 
to ask how we can fix it.

Currently our configurations are:
vi /usr/lib/flink/conf/flink-conf.yaml
# Settings applied by Cloud Dataproc initialization action
jobmanager.rpc.address: bonusengine-prod-m-0
jobmanager.heap.mb: 4096
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 4096
taskmanager.memory.preallocate: false
taskmanager.numberOfTaskSlots: 8
#taskmanager.network.numberOfBuffers: 21952 # legacy deprecated
taskmanager.network.memory.fraction: 0.9
taskmanager.network.memory.min: 67108864
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 65536
parallelism.default: 52
web.port: 8081
web.timeout: 12
heartbeat.interval: 1
heartbeat.timeout: 10
yarn.application-attempts: 10
high-availability: zookeeper
high-availability.zookeeper.quorum: 
bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
high-availability.zookeeper.path.root: /flink
#high-availability.zookeeper.storageDir: hdfs:///flink/recovery # legacy 
deprecated
high-availability.storageDir: hdfs:///flink/recovery
flink.partition-discovery.interval-millis=6
fs.hdfs.hadoopconf: /etc/hadoop/conf
state.backend: rocksdb
state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
state.savepoints.dir: hdfs:///bonusengine/savepoints
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: 127.0.0.1
metrics.reporter.stsd.port: 8125
zookeeper.sasl.disable: true
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 32
web.backpressure.refresh-interval: 6
web.backpressure.num-samples: 100
web.backpressure.delay-between-samples: 50

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 
GB with yarn configuration(see attached file)

We have one yarn session started where 8 jobs are run. Three of them are 
consuming the same source (kafka) which is causing the backpressure, but only 
one of them experiences backpressure. The state of the job is 20 something MB 
and the checkpoint is configured as follows:
checkpointing.interval=30 # makes sure value in  ms of progress happens 
between checkpoints checkpointing.pause_between_checkpointing=24 # 
checkpoints have to complete within value in ms or are discarded 
checkpointing.timeout=6 # allows given number of checkpoints to be in 
progress at the same time checkpointing.max_concurrent_checkpoints=1 # 
enables/disables externalized checkpoints which are retained after job 
cancellation checkpointing.externalized_checkpoints.enabled=true

as checkpointing pause was increased and timeout was reduced on multiple 
occasions as the job kept failing unable to recover from backpressure. RocksDB 
is configured state backend. The problem keeps reproducing even with one minute 
timeout. Also I would like to point out that the perfect checkpointing for that 
job would be 2 min.
I would like to ask what might be the problem or at least how to trace it ?

Best Regards,
Monika Hristova

Get Outlook for Android



yarn-site.xml
Description: yarn-site.xml


Re: Organize env using files

2019-04-22 Thread Rafi Aroch
Hi,

If it helps, we're using Lightbend's Config for that:

* https://github.com/lightbend/config
*
https://www.stubbornjava.com/posts/environment-aware-configuration-with-typesafe-config

Thanks,
Rafi


On Wed, Apr 17, 2019 at 7:07 AM Andy Hoang  wrote:

> I have 3 different files for env: test, staging and production. Each of
> those has different parameters like: kafka host, endpoint urls, redis
> connection host…
> I read about `
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html#register-the-parameters-globally`
>  but
> its has a few downside: If I have a model class (for transformation) and
> want to use a env variable, I have to pass it in the constructor (because
> the global paramters can only accessed inside context)
>
> I wonder if are there any better solution for accessing env anywhere in
> our jobs? I can be fine with having a class storing all the env values and
> if I can somehow spec the env as “prod” I can init the ProdEnv Class… I’m
> looking for any ideas here :)
>
> Thank a bunch,
>
> Andy,
>


Re: Error restoring from checkpoint on Flink 1.8

2019-04-22 Thread Ning Shi
Congxian,

Thanks for the reply. I will try to get a minimum reproducer and post it to this
thread soon.

Ning

On Sun, 21 Apr 2019 09:27:12 -0400,
Congxian Qiu wrote:
> 
> Hi,
> From the given error message, this seems flink can't open RocksDB because
> of the number of column family mismatch, do you mind sharing a minimum job
> which can reproduce this problem?
> 
> Best,
> Congxian


Re: How autoscaling works on Kinesis Data Analytics for Java ?

2019-04-22 Thread Maxim Parkachov
Hi,

Answering to myself in case someone else is interested as well. As per
https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/
it
does autoscaling itself, but in order to change parallelism it takes
snapshot and restarts streaming job, so no magic here, but nicely
automated.

Regards,
Maxim.

On Tue, Jan 29, 2019 at 5:23 AM Maxim Parkachov 
wrote:

> Hi,
>
> I had impression, that in order to change parallelism, one need to stop
> Flink streaming job and re-start with new settings.
>
> According to
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html
> auto-scaling works out of the box. Could someone with experience of running
> Flink on AWS Data Analytics for Java give a short explanation ?
>
> Thanks in advance.
> Maxim.
>