Re: problem with avro serialization

2019-08-30 Thread Debasish Ghosh
>From https://stackoverflow.com/a/56104518  ..

AFAIK the only solution is to update Flink to use avro's
> non-reflection-based constructors in AvroInputFormat
> 
> (compare
> 
> ).


Would love to know if there has been some plans towards fixing this issue ..

regards.

On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh 
wrote:

> Any update on this ?
>
> regards.
>
> On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> Aljoscha opened a JIRA just recently for this issue:
>> https://issues.apache.org/jira/browse/FLINK-12501.
>>
>> Do you know if this is a regression from previous Flink versions?
>> I'm asking just to double check, since from my understanding of the
>> issue, the problem should have already existed before.
>>
>> Thanks,
>> Gordon
>>
>> On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh 
>> wrote:
>>
>>> Hello -
>>>
>>> Facing an issue with avro serialization with Scala case classes
>>> generated through avrohugger ..
>>> Scala case classes generated by avrohugger has the avro schema in the
>>> companion object. This is a sample generated class (details elided) ..
>>>
>>> case class Data(var id: Int, var name: String) extends
>>> org.apache.avro.specific.SpecificRecordBase {
>>>   def this() = this(0, "")
>>>   def get(field$: Int): AnyRef = {
>>> //..
>>>   }
>>>   def put(field$: Int, value: Any): Unit = {
>>> //..
>>>   }
>>>   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
>>> }
>>> object Data {
>>>   val SCHEMA$ = new
>>> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
>>> }
>>>
>>> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$
>>> property in the class & is unable 2 use Java reflection 2 identify the
>>> SCHEMA$ in the companion object. The exception that I get is the
>>> following ..
>>>
>>> java.lang.RuntimeException: Serializing the source elements failed:
 avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.avro.AvroRuntimeException: Not a Specific class: class
 pipelines.flink.avro.Data
>>>
>>>
>>> Any help or workaround will be appreciated ..
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: checkpoint failure suddenly even state size less than 1 mb

2019-08-30 Thread Yun Tang
Hi Sushant

What confuse me is that why source task cannot complete checkpoint in 3 minutes 
[1]. If no sub-task has ever completed the checkpoint, which means even source 
task cannot complete. Actually source task would not need to buffer the data. 
From what I see, it might be affected by acquiring the lock which hold by 
stream task main thread to process elements [2]. Could you use jstack to 
capture your java process' threads to know what happened when checkpoint failed?

[1] 
https://github.com/sushantbprise/flink-dashboard/blob/master/failed-checkpointing/state2.png
[2] 
https://github.com/apache/flink/blob/ccc7eb431477059b32fb924104c17af953620c74/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L758

Best
Yun Tang

From: Sushant Sawant 
Sent: Tuesday, August 27, 2019 15:01
To: user 
Subject: Re: checkpoint failure suddenly even state size less than 1 mb

Hi team,
Anyone for help/suggestion, now we have stopped all input in kafka, there is no 
processing, no sink but checkpointing is failing.
Is it like once checkpoint fails it keeps failing forever until job restart.

Help appreciated.

Thanks & Regards,
Sushant Sawant

On 23 Aug 2019 12:56 p.m., "Sushant Sawant" 
mailto:sushantsawant7...@gmail.com>> wrote:
Hi all,
m facing two issues which I believe are co-related though.
1. Kafka source shows high back pressure.
2. Sudden checkpoint failure for entire day until restart.

My job does following thing,
a. Read from Kafka
b. Asyncio to external system
c. Dumping in Cassandra, Elasticsearch

Checkpointing is using file system.
This flink job is proven under high load,
around 5000/sec throughput.
But recently we scaled down parallelism since, there wasn't any load in 
production and these issues started.

Please find the status shown by flink dashboard.
The github folder contains image where there was high back pressure and 
checkpoint failure
https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
and  after restart, "everything is fine" images in this folder,
https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing

--
Could anyone point me towards direction what would have went wrong/ trouble 
shooting??


Thanks & Regards,
Sushant Sawant



Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Till Rohrmann
For point 2. there exists already a JIRA issue [1] and a PR. I hope that we
can merge it during this release cycle.

[1] https://issues.apache.org/jira/browse/FLINK-13184

Cheers,
Till

On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang  wrote:

> Hi Datashov,
>
> We faced similar problems in our production clusters.
>
> Now both lauching and stopping of containers are performed in the main
> thread of YarnResourceManager. As containers are launched and stopped one
> after another, it usually takes long time to boostrap large jobs. Things
> get worse when some node managers get lost. Yarn will retry many times to
> communicate with them, leading to heartbeat timeout of TaskManagers.
>
> Following are some efforts we made to help Flink deal with large jobs.
>
> 1. We provision some common jars in all cluster nodes and ask our users
> not to include these jars in their uberjar. When containers bootstrap,
> these jars are added to the classpath via JVM options. That way, we can
> efficiently reduce the size of uberjars.
>
> 2. We deploys some asynchronous threads to launch and stop containers in
> YarnResourceManager. The bootstrap time can be efficiently  reduced when
> launching a large amount of containers. We'd like to contribute it to the
> community very soon.
>
> 3. We deploys a timeout timer for each launching container. If a task
> manager does not register in time after its container has been launched, a
> new container will be allocated and launched. That will lead to certain
> waste of resources, but can reduce the effects caused by slow or
> problematic nodes.
>
> Now the community is considering the refactoring of ResourceManager. I
> think it will be the time for improving its efficiency.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:
>
>> Dear Flink developers,
>>
>> Having  difficulty of getting  a Flink job started.
>>
>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>> containers.
>>
>> The default HDFS replication is 3.
>>
>> *The Yarn queue is empty, and 800 containers  are allocated
>> almost immediately  by Yarn  RM.*
>>
>> It takes very long time until all 800 nodes (node managers) will download
>> Uberjar from HDFS to local machines.
>>
>> *Q1:*
>>
>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>> size = HDFS replication size)
>>
>> b) Or Do Flink TM's can replicate from each other  ? or  already started
>> TM's replicate  to  yet-started  nodes?
>>
>> Most probably answer is (a), but  want to confirm.
>>
>> *Q2:*
>>
>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>> containers ?
>>
>> Any specific params to tune?
>>
>> Thanks.
>>
>> Because downloading the UberJar takes really   long time, after around 15
>> minutes since the job kicked, facing this exception:
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>> start container.
>> This token is expired. current time is 1567116179193 found 1567116001610
>> Note: System times on machines may be out of sync. Check system time and 
>> time zones.
>>  at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>> Source)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at 
>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>  at 
>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>  at 
>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>  at 
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>

Re: Flink 1.9, MapR secure cluster, high availability

2019-08-30 Thread Maxim Parkachov
Hi Stephan,

With previous versions, I tried around 1.7, I always had to compile MapR
hadoop to get it working.
With 1.9 I took hadoop-less Flink, which worked with MapR FS until I
switched on HA.
So it is hard to say if this is regression or not.

The error happens when Flink tries to initialize BLOB storage on MapR FS.
Without HA it takes
zookeeper from classpath (MapR org.apache.zookeeper) and with HA it takes
shaded one.

After fixing couple of issue with pom, I was able to compile Flink with
MapR zookeeper and now
when I start with HA mode it uses shaded zookeeper (which is now MapR) to
initialize BLOB and
org.apache.zookeeper (which is as well MapR) for HA recovery.

It works, but, I was expecting it to work without compiling MapR
dependencies.

Hope this helps,
Maxim.

On Thu, Aug 29, 2019 at 7:00 PM Stephan Ewen  wrote:

> Hi Maxim!
>
> The change of the MapR dependency should not have an impact on that.
> Do you know if the same thing worked in prior Flink versions? Is that a
> regression in 1.9?
>
> The exception that you report, is that from Flink's HA services trying to
> connect to ZK, or from the MapR FS client trying to connect to ZK?
>
> Best,
> Stephan
>
>
> On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov 
> wrote:
>
>> Hi everyone,
>>
>> I'm testing release 1.9 on MapR secure cluster. I took flink binaries
>> from download page and trying to start Yarn session cluster. All MapR
>> specific libraries and configs are added according to documentation.
>>
>> When I start yarn-session without high availability, it uses zookeeper
>> from MapR distribution (org.apache.zookeeper) and correctly connects to
>> cluster and access to maprfs works as expected.
>>
>> But if I add zookeeper as high-avalability option, instead of MapR
>> zookeeper it tries to use shaded zookeeper and this one could not connect
>> with mapr credentials:
>>
>> 2019-08-27 10:42:45,240 ERROR 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient
>>   - An error: (java.security.PrivilegedActionException: 
>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>> GSSException: No valid credentials provided (Mechanism level: Failed to find 
>> any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's  
>> received SASL token. Zookeeper Client will go to AUTH_FAILED state.
>> 2019-08-27 10:42:45,240 ERROR 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
>> authentication with Zookeeper Quorum member failed: 
>> javax.security.sasl.SaslException: An error: 
>> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
>> GSS initiate failed [Caused by GSSException: No valid credentials provided 
>> (Mechanism level: Failed to find any Kerberos tgt)]) occurred when 
>> evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client 
>> will go to AUTH_FAILED state.
>> I tried to use separate zookeeper cluster for HA, but maprfs still doesn't 
>> work.
>>
>> Is this related to removal of MapR specific settings in Release 1.9 ?
>> Should I still compile custom version of Flink with MapR dependencies ?
>> (trying to do now, but getting some errors during compilation).
>>
>> Can I somehow force flink to use MapR zookeeper even with HA mode ?
>>
>> Thanks in advance,
>> Maxim.
>>
>>


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jeff Zhang
I can think of 2 approaches:

1. Allow flink to specify the replication of the submitted uber jar.
2. Allow flink to specify config flink.yarn.lib which is all the flink
related jars that are hosted on hdfs. This way users don't need to build
and submit a fat uber jar every time. And those flink jars hosted on hdfs
can also be specify replication separately.



Till Rohrmann  于2019年8月30日周五 下午3:33写道:

> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
> we can merge it during this release cycle.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13184
>
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
> wrote:
>
>> Hi Datashov,
>>
>> We faced similar problems in our production clusters.
>>
>> Now both lauching and stopping of containers are performed in the main
>> thread of YarnResourceManager. As containers are launched and stopped one
>> after another, it usually takes long time to boostrap large jobs. Things
>> get worse when some node managers get lost. Yarn will retry many times to
>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>
>> Following are some efforts we made to help Flink deal with large jobs.
>>
>> 1. We provision some common jars in all cluster nodes and ask our users
>> not to include these jars in their uberjar. When containers bootstrap,
>> these jars are added to the classpath via JVM options. That way, we can
>> efficiently reduce the size of uberjars.
>>
>> 2. We deploys some asynchronous threads to launch and stop containers in
>> YarnResourceManager. The bootstrap time can be efficiently  reduced when
>> launching a large amount of containers. We'd like to contribute it to the
>> community very soon.
>>
>> 3. We deploys a timeout timer for each launching container. If a task
>> manager does not register in time after its container has been launched, a
>> new container will be allocated and launched. That will lead to certain
>> waste of resources, but can reduce the effects caused by slow or
>> problematic nodes.
>>
>> Now the community is considering the refactoring of ResourceManager. I
>> think it will be the time for improving its efficiency.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:
>>
>>> Dear Flink developers,
>>>
>>> Having  difficulty of getting  a Flink job started.
>>>
>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>> containers.
>>>
>>> The default HDFS replication is 3.
>>>
>>> *The Yarn queue is empty, and 800 containers  are allocated
>>> almost immediately  by Yarn  RM.*
>>>
>>> It takes very long time until all 800 nodes (node managers) will
>>> download Uberjar from HDFS to local machines.
>>>
>>> *Q1:*
>>>
>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>>> size = HDFS replication size)
>>>
>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>> started  TM's replicate  to  yet-started  nodes?
>>>
>>> Most probably answer is (a), but  want to confirm.
>>>
>>> *Q2:*
>>>
>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>> containers ?
>>>
>>> Any specific params to tune?
>>>
>>> Thanks.
>>>
>>> Because downloading the UberJar takes really   long time, after around
>>> 15 minutes since the job kicked, facing this exception:
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>> start container.
>>> This token is expired. current time is 1567116179193 found 1567116001610
>>> Note: System times on machines may be out of sync. Check system time and 
>>> time zones.
>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>>> Source)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at 
>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>> at 
>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>> at 
>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akk

Re: Using Avro SpecficRecord serialization instead of slower ReflectDatumWriter/GenericDatumWriter

2019-08-30 Thread Till Rohrmann
Hi Roshan,

these kind of questions should be posted to Flink's user mailing list. I've
cross posted it now.

If you are using Flink's latest version and your type extends
`SpecificRecord`, then Flink's AvroSerializer should use the
`SpecificDatumWriter`. If this is not the case, then this sounds like a
bug. Could you maybe provide us with a bit more details about the Flink
version you are using and the actual job you are executing. Ideally you
link a git repo which contains an example to reproduce the problem.

Cheers,
Till

On Fri, Aug 30, 2019 at 5:55 AM Roshan Naik 
wrote:

> Noticing that Flink takes very long inside collect(..) due to Avro
> serialization that relies on  ReflectDatumWriter & GenericDatumWriter.
>  The object being serialized here is an Avro object that implements
> SpecificRecordBase. It is somewhat about large (~50Kb) and complex.
>
> Looking for a way to use SpecificDatumWriter for the serialization instead
> of the generic/reflection based stuff to speed it up. But don't see a way
> to influence that change.
>
>
>
>
>
>
>
>
>


best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yu Yang
Hi,

We run flink jobs through yarn on hadoop clusters. One challenge that we
are facing is to simplify flink job log access.

The flink job logs can be accessible using "yarn logs $application_id".
That approach has a few limitations:

   1. It is not straightforward to find yarn application id based on flink
   job id.
   2. It is difficult to find the corresponding container id for the flink
   sub tasks.
   3. For jobs that have many tasks, it is inefficient to use "yarn logs
   ..."  as it mixes logs from all task managers.

Any suggestions on the best practice to get logs for completed flink job
that run on yarn?

Regards,
-Yu


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jörn Franke
Increase replication factor and/or use HDFS cache 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
Try to reduce the size of the Jar, eg the Flink libraries do not need to be 
included.

> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov :
> 
> Dear Flink developers,
> 
> Having  difficulty of getting  a Flink job started.
> 
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+ 
> containers.  
> 
> The default HDFS replication is 3.
> 
> The Yarn queue is empty, and 800 containers  are allocated  almost 
> immediately  by Yarn  RM.
> 
> It takes very long time until all 800 nodes (node managers) will download 
> Uberjar from HDFS to local machines.
> 
> Q1:
> 
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch size 
> = HDFS replication size)
> 
> b) Or Do Flink TM's can replicate from each other  ? or  already started  
> TM's replicate  to  yet-started  nodes?
> 
> Most probably answer is (a), but  want to confirm.
> 
> Q2:
> 
> What  is the recommended way of handling  400MB+ Uberjar with 800+ containers 
> ?
> 
> Any specific params to tune?
> 
> Thanks.
> 
> Because downloading the UberJar takes really   long time, after around 15 
> minutes since the job kicked, facing this exception:
> 
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Zhu Zhu
One optimization that we take is letting yarn to reuse the flink-dist jar
which was localized when running previous jobs.

Thanks,
Zhu Zhu

Jörn Franke  于2019年8月30日周五 下午4:02写道:

> Increase replication factor and/or use HDFS cache
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
> Try to reduce the size of the Jar, eg the Flink libraries do not need to
> be included.
>
> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov  >:
>
> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


Re: best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Zhu Zhu
Hi Yu,

Regarding #2,
Currently we search task deployment log in JM log, which contains info of
the container and machine the task deploys to.

Regarding #3,
You can find the application logs aggregated by machines on DFS, this path
of which relies on your YARN config.
Each log may still include multiple TM logs. However it can be much smaller
than the "yarn logs ..." generated log.

Thanks,
Zhu Zhu

Yu Yang  于2019年8月30日周五 下午3:58写道:

> Hi,
>
> We run flink jobs through yarn on hadoop clusters. One challenge that we
> are facing is to simplify flink job log access.
>
> The flink job logs can be accessible using "yarn logs $application_id".
> That approach has a few limitations:
>
>1. It is not straightforward to find yarn application id based on
>flink job id.
>2. It is difficult to find the corresponding container id for the
>flink sub tasks.
>3. For jobs that have many tasks, it is inefficient to use "yarn logs
>..."  as it mixes logs from all task managers.
>
> Any suggestions on the best practice to get logs for completed flink job
> that run on yarn?
>
> Regards,
> -Yu
>
>
>


Re: Error while using catalog in .yaml file

2019-08-30 Thread Yebgenya Lazarkhosrouabadi
Hello,

I build Flink from source and have the flink-connector-hive jar file now. I 
copied this file to the lib directory of flink but I still get the same error 
as I try to run ./sql-client.sh embedded. I get this error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
   at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
   at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
   at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:553)
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:373)
   at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
   ... 2 more
Caused by: java.lang.NoClassDefFoundError: 
org/apache/hive/common/util/HiveVersionInfo
   at 
org.apache.flink.table.catalog.hive.client.HiveShimLoader.getHiveVersion(HiveShimLoader.java:58)
   at 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:82)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:259)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:136)
   at java.util.HashMap.forEach(HashMap.java:1289)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:135)
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:549)
   ... 4 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.hive.common.util.HiveVersionInfo
   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)




I have this information for the Classpath in the log file:

2019-08-28 20:06:42,278 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath: 
:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/flink-connector-hive_2.11-1.9.0.jar:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/flink-dist_2.11-1.9.0.jar:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/flink-table_2.11-1.9.0.jar:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/flink-table-blink_2.11-1.9.0.jar:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/log4j-1.2.17.jar:/home/user/Dokumente/flink-1.9.0/flink-dist/target/flink-1.9.0-bin/flink-1.9.0/lib/slf4j-log4j12-1.7.15.jar::/usr/local/hadoop/hadoop-2.7.7/etc/hadoop:


The configuration of the catalog in the sql-client-defaults.yaml is like this:

catalogs:
  - name: mynewhive
type: hive
property-version: 1
hive-conf-dir: /home/bernadette/Downloads/apache-hive-1.2.2-bin/conf
hive-version: 1.2.1


I get no error when I remove these from the yaml file.


I look forward to hearing from you.

Regards
Yebgenya Lazar


Von: Bowen Li 
Gesendet: Montag, 26. August 2019 22:45
An: Yebgenya Lazarkhosrouabadi 
Cc: user@flink.apache.org
Betreff: Re: Error while using catalog in .yaml file

Put flink-connector-hive jar in classpath



On Sun, Aug 25, 2019 at 9:14 AM Yebgenya Lazarkhosrouabadi 
mailto:lazarkhosrouab...@integration-factory.de>>
 wrote:
Hello,

I’m trying to use hivecatalog in flink1.9. I modified the yaml file like this:


catalogs:
  - name: mynewhive
type: hive
hive-conf-dir: /home/user/Downloads/apache-hive-1.2.2-bin/conf
default-database: myhive


But when I try to run ./sql-client.sh embedded  I get this error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
   at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
   at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
   at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:553)
   at 
org.apache.flink

Re: problem with avro serialization

2019-08-30 Thread Aljoscha Krettek
Hi,

I cut a PR that should fix this issue for Avrohugger: 
https://github.com/apache/flink/pull/9565 


Would you be able to build this and see if it solves your problem?

Best,
Aljoscha

> On 30. Aug 2019, at 09:02, Debasish Ghosh  wrote:
> 
> From https://stackoverflow.com/a/56104518 
>   ..
> 
> AFAIK the only solution is to update Flink to use avro's non-reflection-based 
> constructors in AvroInputFormat 
> 
>  (compare 
> ).
> 
> Would love to know if there has been some plans towards fixing this issue ..
> 
> regards. 
> 
> On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh  > wrote:
> Any update on this ? 
> 
> regards.
> 
> On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai  > wrote:
> Hi,
> 
> Aljoscha opened a JIRA just recently for this issue: 
> https://issues.apache.org/jira/browse/FLINK-12501 
> .
> 
> Do you know if this is a regression from previous Flink versions?
> I'm asking just to double check, since from my understanding of the issue, 
> the problem should have already existed before.
> 
> Thanks,
> Gordon
> 
> On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh  > wrote:
> Hello -
> 
> Facing an issue with avro serialization with Scala case classes generated 
> through avrohugger ..
> Scala case classes generated by avrohugger has the avro schema in the 
> companion object. This is a sample generated class (details elided) ..
> 
> case class Data(var id: Int, var name: String) extends 
> org.apache.avro.specific.SpecificRecordBase {
>   def this() = this(0, "")
>   def get(field$: Int): AnyRef = {
> //..
>   }
>   def put(field$: Int, value: Any): Unit = {
> //..
>   }
>   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
> }
> object Data {
>   val SCHEMA$ = new 
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
> }
> 
> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property 
> in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the 
> companion object. The exception that I get is the following ..
> 
> java.lang.RuntimeException: Serializing the source elements failed: 
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
> org.apache.avro.AvroRuntimeException: Not a Specific class: class 
> pipelines.flink.avro.Data
> 
> Any help or workaround will be appreciated ..
> 
> regards.
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 
> http://manning.com/ghosh 
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com 
> Code: http://github.com/debasishg 
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 
> http://manning.com/ghosh 
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com 
> Code: http://github.com/debasishg 
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 
> http://manning.com/ghosh 
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com 
> Code: http://github.com/debasishg 


Non incremental window function accumulates unbounded state with RocksDb

2019-08-30 Thread William Jonsson
Hello,
I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.
When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.
I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?
I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).
Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?
Best regards,
William

class Histogram extends WindowFunction[String, Histogram, TimeWindow] {
def process (key : T, window: TimeWindow, input : Itrable[String]) = {
 //Calculate the histogram
}
override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogram]) : Unit = {
out.collect(process(key, window, input))
}
}
env.getCheckpointConfig.setCheckpointTimeout(40)
env.getCheckpointConfig.setMinPauseBetweenCheckpoint(45)
val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)
env.setStateBackend(stateBackend)
env.enableCheckpointing(90)
DataStream stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties));
env.
stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new 
Histogram()).name(“Pseudocode").uuid(“Psuedocode”)


William Jonsson
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden  Mobile: +46 722 178 247
william.jons...@niradynamics.se
www.niradynamics.se
Together for smarter safety



Re: best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yun Tang
Hi  Yu

If you have client job log and you could find your application id from below 
description:

The Flink YARN client has been started in detached mode. In order to stop Flink 
on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill {appId}
Please also note that the temporary files of the YARN session in the home 
directory will not be removed.

Best
Yun Tang


From: Zhu Zhu 
Sent: Friday, August 30, 2019 16:24
To: Yu Yang 
Cc: user 
Subject: Re: best practices on getting flink job logs from Hadoop history 
server?

Hi Yu,

Regarding #2,
Currently we search task deployment log in JM log, which contains info of the 
container and machine the task deploys to.

Regarding #3,
You can find the application logs aggregated by machines on DFS, this path of 
which relies on your YARN config.
Each log may still include multiple TM logs. However it can be much smaller 
than the "yarn logs ..." generated log.

Thanks,
Zhu Zhu

Yu Yang mailto:yuyan...@gmail.com>> 于2019年8月30日周五 下午3:58写道:
Hi,

We run flink jobs through yarn on hadoop clusters. One challenge that we are 
facing is to simplify flink job log access.

The flink job logs can be accessible using "yarn logs $application_id". That 
approach has a few limitations:

  1.  It is not straightforward to find yarn application id based on flink job 
id.
  2.  It is difficult to find the corresponding container id for the flink sub 
tasks.
  3.  For jobs that have many tasks, it is inefficient to use "yarn logs ..."  
as it mixes logs from all task managers.

Any suggestions on the best practice to get logs for completed flink job that 
run on yarn?

Regards,
-Yu




Re: Non incremental window function accumulates unbounded state with RocksDb

2019-08-30 Thread Yun Tang
Hi William

I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb


Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogram]) : Unit = {

out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(40)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(45)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(90)

DataStream stream = env

.addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new 
Histogram()).name(“Pseudocode").uuid(“Psuedocode”)



William Jonsson
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden  Mobile: +46 722 178 247
william.jons...@niradynamics.se
www.niradynamics.se
Together for smarter safety


Re: Non incremental window function accumulates unbounded state with RocksDb

2019-08-30 Thread William Jonsson
Thanks for your answer Yun.

I agree, I don’t believe that either, however that’s my empirical observation. 
Those statistics are from save points. Basically the jobs are running towards a 
production kafka so no, not exactly the same input. However, these statistics 
are from several runs distributed in time so they should not contain temporal 
effects. There are no failovers in the pipeline during runtime. By doing some 
calculations on the size and the pace of the data in the pipeline (how often we 
receive data and how big the datatype is) yields that the buffered data in the 
windows should be around a little less than 200Mb, so the HeapBackend behaves 
accordingly. I agree, the space amplification can’t be a factor of 400 and 
still continue growing for the RocksDb. I’ve spent some time trying to figure 
this out, if we are doing anything obscure , but I cant find anything. So it 
would be interesting if anyone have the same experience as I have.

The pipeline is currently running on Flink 1.7.2

Best regards and wish you a pleasant day,
William

From: Yun Tang 
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson , "user@flink.apache.org" 

Cc: Fleet Perception for Maintenance 

Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb

Hi William

I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb


Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogram]) : Unit = {

out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(40)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(45)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(90)

DataStream stream = env

.addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new 
Histogram()).name(“Pseudocode").uuid(“Psuedocode”)


William Jonsson
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8

Re: Assigning UID to Flink SQL queries

2019-08-30 Thread Yuval Itzchakov
Anyone?

On Tue, 27 Aug 2019, 17:23 Yuval Itzchakov,  wrote:

> Hi,
>
> We a have a bunch of Flink SQL queries running in our Flink environment.
> For
> regular Table API interactions, we can override `uid` which also gives us
> an
> indicative name for the thread/UI to look at. For Flink SQL queries, this
> doesn't seem the the case which results in very verbose names (essentially
> the entire query plan) shown in the UI / thread names while debugging.
>
> Is there any convenient way to set / override the UID for SQL defined
> queries?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: problem with avro serialization

2019-08-30 Thread Debasish Ghosh
Thanks a lot .. sure I can do a build with this PR and check.

regards.

On Fri, Aug 30, 2019 at 2:20 PM Aljoscha Krettek 
wrote:

> Hi,
>
> I cut a PR that should fix this issue for Avrohugger:
> https://github.com/apache/flink/pull/9565
>
> Would you be able to build this and see if it solves your problem?
>
> Best,
> Aljoscha
>
> On 30. Aug 2019, at 09:02, Debasish Ghosh 
> wrote:
>
> From https://stackoverflow.com/a/56104518  ..
>
> AFAIK the only solution is to update Flink to use avro's
>> non-reflection-based constructors in AvroInputFormat
>> 
>> (compare
>> 
>> ).
>
>
> Would love to know if there has been some plans towards fixing this issue
> ..
>
> regards.
>
> On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh 
> wrote:
>
>> Any update on this ?
>>
>> regards.
>>
>> On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> Aljoscha opened a JIRA just recently for this issue:
>>> https://issues.apache.org/jira/browse/FLINK-12501.
>>>
>>> Do you know if this is a regression from previous Flink versions?
>>> I'm asking just to double check, since from my understanding of the
>>> issue, the problem should have already existed before.
>>>
>>> Thanks,
>>> Gordon
>>>
>>> On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh 
>>> wrote:
>>>
 Hello -

 Facing an issue with avro serialization with Scala case classes
 generated through avrohugger ..
 Scala case classes generated by avrohugger has the avro schema in the
 companion object. This is a sample generated class (details elided) ..

 case class Data(var id: Int, var name: String) extends
 org.apache.avro.specific.SpecificRecordBase {
   def this() = this(0, "")
   def get(field$: Int): AnyRef = {
 //..
   }
   def put(field$: Int, value: Any): Unit = {
 //..
   }
   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
 }
 object Data {
   val SCHEMA$ = new
 org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
 }

 Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$
 property in the class & is unable 2 use Java reflection 2 identify the
 SCHEMA$ in the companion object. The exception that I get is the
 following ..

 java.lang.RuntimeException: Serializing the source elements failed:
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.avro.AvroRuntimeException: Not a Specific class: class
> pipelines.flink.avro.Data


 Any help or workaround will be appreciated ..

 regards.

 --
 Debasish Ghosh
 http://manning.com/ghosh2
 http://manning.com/ghosh

 Twttr: @debasishg
 Blog: http://debasishg.blogspot.com
 Code: http://github.com/debasishg

>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Flink 1.9, MapR secure cluster, high availability

2019-08-30 Thread Stephan Ewen
Could you share the stack trace where the failure occurs, so we can see why
the Flink ZK is used during MapR FS access?

/CC Till and Tison - just FYI

On Fri, Aug 30, 2019 at 9:40 AM Maxim Parkachov 
wrote:

> Hi Stephan,
>
> With previous versions, I tried around 1.7, I always had to compile MapR
> hadoop to get it working.
> With 1.9 I took hadoop-less Flink, which worked with MapR FS until I
> switched on HA.
> So it is hard to say if this is regression or not.
>
> The error happens when Flink tries to initialize BLOB storage on MapR FS.
> Without HA it takes
> zookeeper from classpath (MapR org.apache.zookeeper) and with HA it takes
> shaded one.
>
> After fixing couple of issue with pom, I was able to compile Flink with
> MapR zookeeper and now
> when I start with HA mode it uses shaded zookeeper (which is now MapR) to
> initialize BLOB and
> org.apache.zookeeper (which is as well MapR) for HA recovery.
>
> It works, but, I was expecting it to work without compiling MapR
> dependencies.
>
> Hope this helps,
> Maxim.
>
> On Thu, Aug 29, 2019 at 7:00 PM Stephan Ewen  wrote:
>
>> Hi Maxim!
>>
>> The change of the MapR dependency should not have an impact on that.
>> Do you know if the same thing worked in prior Flink versions? Is that a
>> regression in 1.9?
>>
>> The exception that you report, is that from Flink's HA services trying to
>> connect to ZK, or from the MapR FS client trying to connect to ZK?
>>
>> Best,
>> Stephan
>>
>>
>> On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm testing release 1.9 on MapR secure cluster. I took flink binaries
>>> from download page and trying to start Yarn session cluster. All MapR
>>> specific libraries and configs are added according to documentation.
>>>
>>> When I start yarn-session without high availability, it uses zookeeper
>>> from MapR distribution (org.apache.zookeeper) and correctly connects to
>>> cluster and access to maprfs works as expected.
>>>
>>> But if I add zookeeper as high-avalability option, instead of MapR
>>> zookeeper it tries to use shaded zookeeper and this one could not connect
>>> with mapr credentials:
>>>
>>> 2019-08-27 10:42:45,240 ERROR 
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient
>>>   - An error: (java.security.PrivilegedActionException: 
>>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>>> GSSException: No valid credentials provided (Mechanism level: Failed to 
>>> find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's 
>>>  received SASL token. Zookeeper Client will go to AUTH_FAILED state.
>>> 2019-08-27 10:42:45,240 ERROR 
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
>>> authentication with Zookeeper Quorum member failed: 
>>> javax.security.sasl.SaslException: An error: 
>>> (java.security.PrivilegedActionException: 
>>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>>> GSSException: No valid credentials provided (Mechanism level: Failed to 
>>> find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's 
>>>  received SASL token. Zookeeper Client will go to AUTH_FAILED state.
>>> I tried to use separate zookeeper cluster for HA, but maprfs still doesn't 
>>> work.
>>>
>>> Is this related to removal of MapR specific settings in Release 1.9 ?
>>> Should I still compile custom version of Flink with MapR dependencies ?
>>> (trying to do now, but getting some errors during compilation).
>>>
>>> Can I somehow force flink to use MapR zookeeper even with HA mode ?
>>>
>>> Thanks in advance,
>>> Maxim.
>>>
>>>


[SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread Till Rohrmann
Hi everyone,

I wanted to reach out to you and ask whether decreasing the default delay
to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
user reported that he would like to increase the default value because it
can cause restart storms in case of systematic faults [2].

The downside of increasing the default delay would be a slightly increased
restart time if this config option is not explicitly set.

[1] https://issues.apache.org/jira/browse/FLINK-9158
[2] https://issues.apache.org/jira/browse/FLINK-11218

Cheers,
Till


Re: Build Flink against a vendor specific Hadoop version

2019-08-30 Thread Elise RAMÉ
Thank you all !
Classpath option works for me and is easier so I’ll do this way.

About flink-shaded and vendor-repos, would it be helpful if I describe this 
issue in a Jira ticket ?

> Le 30 août 2019 à 11:43, Chesnay Schepler  a écrit :
> 
> It appears we did not port the vendor-repos profile to flink-shaded.
> 
> If the classpath option is not viable in your case (for example because you 
> cannot use child-first classloading), then currently you would have to setup 
> the vendor repos yourself in your local maven installation.
> 
> On 29/08/2019 18:48, Stephan Ewen wrote:
>> The easiest thing is to build Flink against a specific Hadoop version at 
>> all, but just to take plain Flink (Hadoop free) and export the 
>> HADOOP_CLASSPATH variable to point to the vendor libraries.
>> 
>> Does that work for you?
>> 
>> On Thu, Aug 29, 2019 at 4:15 PM Elise RAMÉ > > wrote:
>> Hi all,
>> 
>> I need to build Flink 1.9 against a vendor specific Hadoop version and 
>> something bother me in the Building Flink from Source documentation 
>> 
>>  (1.9 and 1.10-SNAPSHOT versions).
>> 
>> As far as I understand this documentation, I have to build flink-shaded 
>>  using the vendor-repos Maven 
>> profile : 
>> mvn clean install -DskipTests -Pvendor-repos 
>> -Dhadoop.version=2.7.3.2.6.5.0-292
>> 
>> I tried it using flink-shaded 7.0 (given version in Flink downloads page), 
>> 8.0 version or master branch and always got the same error (attached 
>> screenshot) :
>> 
>> 
>> It seems to me that flink-shaded-hadoop module has been moved from Flink to 
>> flink-shaded recently and I found the expected profile in Flink pom.xml file 
>> :
>> https://github.com/apache/flink/blob/3079d11913f153ec40c75afb5356fd3be1a1e550/pom.xml#L1037
>>  
>> 
>> So I succeed in building flink-shaded and then Flink against my specific 
>> version of Hadoop by adding this profile into flink-shaded pom.xml first.
>> 
>> Didn’t I understand the documentation or vendor-repos profile has indeed to 
>> be defined into flink-shaded pom.xml ?
>> 
>> Thanks, 
>> Elise
> 



Re: problem with avro serialization

2019-08-30 Thread Debasish Ghosh
Hello Aljoscha -

I made a comment on your PR (
https://github.com/apache/flink/pull/9565/files#r319598469). With the
suggested fix it runs fine .. Thanks.

regards.

On Fri, Aug 30, 2019 at 4:48 PM Debasish Ghosh 
wrote:

> Thanks a lot .. sure I can do a build with this PR and check.
>
> regards.
>
> On Fri, Aug 30, 2019 at 2:20 PM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I cut a PR that should fix this issue for Avrohugger:
>> https://github.com/apache/flink/pull/9565
>>
>> Would you be able to build this and see if it solves your problem?
>>
>> Best,
>> Aljoscha
>>
>> On 30. Aug 2019, at 09:02, Debasish Ghosh 
>> wrote:
>>
>> From https://stackoverflow.com/a/56104518  ..
>>
>> AFAIK the only solution is to update Flink to use avro's
>>> non-reflection-based constructors in AvroInputFormat
>>> 
>>> (compare
>>> 
>>> ).
>>
>>
>> Would love to know if there has been some plans towards fixing this issue
>> ..
>>
>> regards.
>>
>> On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh 
>> wrote:
>>
>>> Any update on this ?
>>>
>>> regards.
>>>
>>> On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi,

 Aljoscha opened a JIRA just recently for this issue:
 https://issues.apache.org/jira/browse/FLINK-12501.

 Do you know if this is a regression from previous Flink versions?
 I'm asking just to double check, since from my understanding of the
 issue, the problem should have already existed before.

 Thanks,
 Gordon

 On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <
 ghosh.debas...@gmail.com> wrote:

> Hello -
>
> Facing an issue with avro serialization with Scala case classes
> generated through avrohugger ..
> Scala case classes generated by avrohugger has the avro schema in the
> companion object. This is a sample generated class (details elided) ..
>
> case class Data(var id: Int, var name: String) extends
> org.apache.avro.specific.SpecificRecordBase {
>   def this() = this(0, "")
>   def get(field$: Int): AnyRef = {
> //..
>   }
>   def put(field$: Int, value: Any): Unit = {
> //..
>   }
>   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
> }
> object Data {
>   val SCHEMA$ = new
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
> }
>
> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$
> property in the class & is unable 2 use Java reflection 2 identify the
> SCHEMA$ in the companion object. The exception that I get is the
> following ..
>
> java.lang.RuntimeException: Serializing the source elements failed:
>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.avro.AvroRuntimeException: Not a Specific class: class
>> pipelines.flink.avro.Data
>
>
> Any help or workaround will be appreciated ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Elkhan Dadashov
Thanks  everyone for valuable input and sharing  your experience for
tackling the issue.

Regarding suggestions :
- We provision some common jars in all cluster nodes  *-->*  but this
requires dependence on Infra Team schedule for handling common jars/updating
- Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half size),
did not improve much. Only 100 containers could started in time. but then
receiving :

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1566422713305 found 1566422560552
Note: System times on machines may be out of sync. Check system time
and time zones.


- It would be nice to see FLINK-13184
 , but expected version
that will get in is 1.10
- Increase replication factor --> It would be nice to have Flink conf for
setting replication factor for only Fink job jars, but not the output. It
is also challenging to set a replication for yet non-existing directory,
the new files will have default replication factor. Will explore HDFS cache
option.

Maybe another option can be:
- Letting yet-to-be-started Task Managers (or NodeManagers) download the
jars from already started TaskManagers  in P2P fashion, not to have a
blocker on HDFS replication.

Spark job without any tuning exact same size jar with 800 executors, can
start without any issue at the same cluster in less than a minute.

*Further questions:*

*@ SHI Xiaogang > :*

I see that all 800 requests are sent concurrently :

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources . Number pending requests
793.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources . Number pending requests
794.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{71bbb917374ade66df4c058c41b81f4e}.
...

Can you please elaborate the part  "As containers are launched and stopped
one after another" ? Any pointer to class/method in Flink?

*@ Zhu Zhu > *:

Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."

We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
(from offline source), to have 1 single pipeline for both batch and
real-time. So for batch Flink job, the containers will be released once the
job is done.
I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

Thanks.


On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:

> I can think of 2 approaches:
>
> 1. Allow flink to specify the replication of the submitted uber jar.
> 2. Allow flink to specify config flink.yarn.lib which is all the flink
> related jars that are hosted on hdfs. This way users don't need to build
> and submit a fat uber jar every time. And those flink jars hosted on hdfs
> can also be specify replication separately.
>
>
>
> Till Rohrmann  于2019年8月30日周五 下午3:33写道:
>
>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>> we can merge it during this release cycle.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
>> wrote:
>>
>>> Hi Datashov,
>>>
>>> We faced similar problems in our production clusters.
>>>
>>> Now both lauching and stopping of containers are performed in the main
>>> thread of YarnResourceManager. As containers are launched and stopped one
>>> after another, it usually takes long time to boostrap large jobs. Things
>>> get worse when some node managers get lost. Yarn will retry many times to
>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>
>>> Following are some efforts we made to help Flink deal with large jobs.
>>>
>>> 1. We provision some common jars in all cluster nodes and ask our users
>>> not to include these jars in their uberjar. When containers bootstrap,
>>> these jars are added to the classpath via JVM options. That way, we can
>>> efficiently reduce the size of uberjars.
>>>
>>> 2. We deploys some asynchronous threads to launch and stop containers in
>>> YarnResourceManager. The bootstrap tim

[ANNOUNCEMENT] September 2019 Bay Area Apache Flink Meetup

2019-08-30 Thread Xuefu Zhang
Hi all,

As promised, we planned to have quarterly Flink meetup and now it's about
the time. Thus, I'm happy to announce that the next Bay Area Apache Flink
Meetup [1] is scheduled on Sept. 24 at Yelp, 140 New Montgomery in San
Francisco.

Schedule:

6:30 - Door open
6:30 - 7:00 PM Networking and Refreshments
7:00 - 8:30 PM Short talks

-- Two years of Flink @ Yelp (Enrico Canzonieri, 30m)
-- How BNP Paribas Fortis uses Flink for real-time fraud detectionDavid
Massart (David Massart, tentative)

Please refer to the meetup page [1] for more details.

Many thanks go to Yelp for their sponsorship. At the same time, we might
still have room for one more short talk. Please let me know if interested.


Thanks,

Xuefu

[1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262680261/


[ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-08-30 Thread Bowen Li
Hi all,

I'm glad to announce that, as #9494
was merged today,
flink-connector-kinesis is officially of Apache 2.0 license now in master
branch and its artifact will be deployed to Maven central as part of Flink
releases starting from Flink 1.10.0. Users can use the artifact out of
shelf then and no longer have to build and maintain it on their own.

It brings a much better user experience to our large AWS customer base by
making their work simpler, smoother, and more productive!

Thanks everyone who participated in coding and review to drive this
initiative forward.

Cheers,
Bowen


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread Zhu Zhu
In our production, we usually override the restart delay to be 10 s.
We once encountered cases that external services are overwhelmed by
reconnections from frequent restarted tasks.
As a safer though not optimized option, a default delay larger than 0 s is
better in my opinion.


未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:

> Hi,
>
>
> I thinks it's better to increase the default value. +1
>
>
> Best.
>
>
>
>
> -- 原始邮件 --
> 发件人: "Till Rohrmann";
> 发送时间: 2019年8月30日(星期五) 晚上10:07
> 收件人: "dev"; "user";
> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>
>
>
> Hi everyone,
>
> I wanted to reach out to you and ask whether decreasing the default delay
> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
> user reported that he would like to increase the default value because it
> can cause restart storms in case of systematic faults [2].
>
> The downside of increasing the default delay would be a slightly increased
> restart time if this config option is not explicitly set.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9158
> [2] https://issues.apache.org/jira/browse/FLINK-11218
>
> Cheers,
> Till


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread SHI Xiaogang
Hi Dadashov,

You may have a look at method YarnResourceManager#onContainersAllocated
which will launch containers (via NMClient#startContainer) after containers
are allocated.
The launching is performed in the main thread of YarnResourceManager and
the launching is synchronous/blocking. Consequently, the containers will be
launched one by one.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:

> Thanks  everyone for valuable input and sharing  your experience for
> tackling the issue.
>
> Regarding suggestions :
> - We provision some common jars in all cluster nodes  *-->*  but this
> requires dependence on Infra Team schedule for handling common jars/updating
> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
> size),  did not improve much. Only 100 containers could started in time.
> but then receiving :
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1566422713305 found 1566422560552
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>
>
> - It would be nice to see FLINK-13184
>  , but expected
> version that will get in is 1.10
> - Increase replication factor --> It would be nice to have Flink conf for
> setting replication factor for only Fink job jars, but not the output. It
> is also challenging to set a replication for yet non-existing directory,
> the new files will have default replication factor. Will explore HDFS cache
> option.
>
> Maybe another option can be:
> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
> jars from already started TaskManagers  in P2P fashion, not to have a
> blocker on HDFS replication.
>
> Spark job without any tuning exact same size jar with 800 executors, can
> start without any issue at the same cluster in less than a minute.
>
> *Further questions:*
>
> *@ SHI Xiaogang > :*
>
> I see that all 800 requests are sent concurrently :
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 793.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 794.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
> ...
>
> Can you please elaborate the part  "As containers are launched and stopped
> one after another" ? Any pointer to class/method in Flink?
>
> *@ Zhu Zhu > *:
>
> Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
>
> We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
> (from offline source), to have 1 single pipeline for both batch and
> real-time. So for batch Flink job, the containers will be released once the
> job is done.
> I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> Thanks.
>
>
> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:
>
>> I can think of 2 approaches:
>>
>> 1. Allow flink to specify the replication of the submitted uber jar.
>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>> related jars that are hosted on hdfs. This way users don't need to build
>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>> can also be specify replication separately.
>>
>>
>>
>> Till Rohrmann  于2019年8月30日周五 下午3:33写道:
>>
>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>>> we can merge it during this release cycle.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
>>> wrote:
>>>
 Hi Datashov,

 We faced similar problems in our production clusters.

 Now both lauching and stopping of containers are performed in the main
 thread of YarnResourceManager. As containers are launched and stopped one
 after another, it usually takes long time to boostrap large jobs. Things
 get worse when some node managers g