Job Manager killed by Kubernetes during recovery

2018-08-18 Thread Bruno Aranda
Hi,

I am experiencing an issue when a job manager is trying to recover using a
HA setup. When the job manager starts again and tries to resume from the
last checkpoints, it gets killed by Kubernetes (I guess), since I can see
the following in the logs while the jobs are deployed:

INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

I am requesting enough memory for it, 3000Gi, and it is configured to use
2048Gb of memory. I have tried to increase the max perm size, but did not
see an improvement.

Any suggestions to help diagnose this?

I have the following:

Flink 1.6.0 (same with 1.5.1)
Azure AKS with Kubernetes 1.11
State management using RocksDB with checkpoints stored in Azure Data Lake

Thanks!

Bruno


Re: Job Manager killed by Kubernetes during recovery

2018-08-22 Thread Bruno Aranda
Actually, I have found the issue. It was a simple thing, really, once you
know it of course.

It was caused by the livenessProbe kicking in too early. For a Flink
cluster with several jobs, the default 30 seconds I was using (after using
the Flink helm chart in the examples) was not enough to let the job manager
to fully recover and start. Increasing that, fixes the issue.

I ended up with a job manager with 4000Gi as limit, 3000Gi requested, and
configured to use 2048Gb. So I guess that was a red herring for me.

Managed to see what was going on by using the kubectl "describe" action,
where it was clearly indicated as an event.

Thanks Vino and Till for your time!

Bruno

On Tue, 21 Aug 2018 at 10:21 Till Rohrmann  wrote:

> Hi Bruno,
>
> in order to debug this problem we would need a bit more information. In
> particular, the logs of the cluster entrypoint and your K8s deployment
> specification would be helpful. If you have some memory limits specified
> these would also be interesting to know.
>
> Cheers,
> Till
>
> On Sun, Aug 19, 2018 at 2:43 PM vino yang  wrote:
>
>> Hi Bruno,
>>
>> Ping Till for you, he may give you some useful information.
>>
>> Thanks, vino.
>>
>> Bruno Aranda  于2018年8月19日周日 上午6:57写道:
>>
>>> Hi,
>>>
>>> I am experiencing an issue when a job manager is trying to recover using
>>> a HA setup. When the job manager starts again and tries to resume from the
>>> last checkpoints, it gets killed by Kubernetes (I guess), since I can see
>>> the following in the logs while the jobs are deployed:
>>>
>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>>
>>> I am requesting enough memory for it, 3000Gi, and it is configured to
>>> use 2048Gb of memory. I have tried to increase the max perm size, but did
>>> not see an improvement.
>>>
>>> Any suggestions to help diagnose this?
>>>
>>> I have the following:
>>>
>>> Flink 1.6.0 (same with 1.5.1)
>>> Azure AKS with Kubernetes 1.11
>>> State management using RocksDB with checkpoints stored in Azure Data Lake
>>>
>>> Thanks!
>>>
>>> Bruno
>>>
>>>


Can't get the FlinkKinesisProducer to work against Kinesalite for tests

2018-09-26 Thread Bruno Aranda
Hi,

We have started to use Kinesis with Flink and we need to be able to test
when a Flink jobs writes to Kinesis. For that, we use a docker image with
Kinesalite.

To configure the producer, we do like it is explained in the docs [1].

However, if we use this code, the job submission is going to fail, because
the Flink Kinesis connector expect the configuration to have either the
endpoint or the region, but not both, or none. (there is a typo in the
error message as well where 'aws.region' is metioned twice) [2].

However, if we only specify the endpoint, then the KPL will fail
complaining that there is no Region configured. It does look like Kinesis
may not be trying to set up the endpoint? We are confused.

On the other hand, the Flink consumer works as expected and the endpoint
pointing to Kinesalite works fine. The consumer follows a different path
and creates the AWS client through a call to AWSUtil [3], which will take
the endpoint into account.

Are we missing something? We have tried this in Flink versions from 1.3.2
to 1.6.1, building the kinesis connector against the latests KPLs.

Any help is appreciated,

Thanks!

Bruno

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#using-non-aws-kinesis-endpoints-for-testing
[2]
https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L272
[3]
https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L66


Re: Can't get the FlinkKinesisProducer to work against Kinesalite for tests

2018-09-27 Thread Bruno Aranda
Hi again,

We managed at the end to get data into Kinesalite using the
FlinkKinesisProducer, but to do so, we had to use different configuration,
such as ignoring the 'aws.endpoint' setting and going for the ones that the
Kinesis configuration will expect. So, to our FlinkKinesisProducer we pass
configuration such as:

producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put("KinesisEndpoint",
"localhost")producerConfig.put("KinesisPort",
"4567")producerConfig.put("VerifyCertificate", "false")

We had to make sure that Kinesalite itself was being started with the
`--ssl` parameter, in order to use TLS and available thought HTTPS.

And, very importantly as well, our tests use Docker to run and we have find
out just before throwing the towel that for this you can not use an
Alpine-based image. If you want the Amazon KPL to work fine, it will need
to be one of the Debian images running in Docker.

Hope this saves someone all the days we have spent looking at it :)

Cheers,

Bruno

On Wed, 26 Sep 2018 at 14:59 Bruno Aranda  wrote:

> Hi,
>
> We have started to use Kinesis with Flink and we need to be able to test
> when a Flink jobs writes to Kinesis. For that, we use a docker image with
> Kinesalite.
>
> To configure the producer, we do like it is explained in the docs [1].
>
> However, if we use this code, the job submission is going to fail, because
> the Flink Kinesis connector expect the configuration to have either the
> endpoint or the region, but not both, or none. (there is a typo in the
> error message as well where 'aws.region' is metioned twice) [2].
>
> However, if we only specify the endpoint, then the KPL will fail
> complaining that there is no Region configured. It does look like Kinesis
> may not be trying to set up the endpoint? We are confused.
>
> On the other hand, the Flink consumer works as expected and the endpoint
> pointing to Kinesalite works fine. The consumer follows a different path
> and creates the AWS client through a call to AWSUtil [3], which will take
> the endpoint into account.
>
> Are we missing something? We have tried this in Flink versions from 1.3.2
> to 1.6.1, building the kinesis connector against the latests KPLs.
>
> Any help is appreciated,
>
> Thanks!
>
> Bruno
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#using-non-aws-kinesis-endpoints-for-testing
> [2]
> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L272
> [3]
> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L66
>
>


IndexOutOfBoundsException on deserialization after updating to 1.6.1

2018-10-17 Thread Bruno Aranda
Hi,

We are trying to update from 1.3.2 to 1.6.1, but one of our jobs keeps
throwing an exception during deserialization:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:51)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:29)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
at
org.apache.flink.streaming.api.scala.function.StatefulFunction$class.applyWithState(StatefulFunction.scala:41)
...

We haven't changed the code for the job (apart from updating to 1.6.1), so
we are not sure what may have changed. This is caused in a RichMapFunction
that extends a StatefulFunction. The state is a case class, and we create
its state serializer with the following code:

override protected lazy val stateSerializer: TypeSerializer[ClipinState] =
  
api.scala.createTypeInformation[MyCaseClass].createSerializer(getRuntimeContext.getExecutionConfig)

Any clues on what may be going on or where to look further? This was not an
issue on 1.3.2...

Thanks!

Bruno


Re: IndexOutOfBoundsException on deserialization after updating to 1.6.1

2018-10-17 Thread Bruno Aranda
Hi,

Thanks for your reply. We are still trying to isolate it, because this job
was using a more complex state. I think it is caused by a case class that
has an Option[MyOtherClass], and MyOtherClass is an enumerator, implemented
using the enumeratum library. I have changed that option to be just a
Option[Boolean], and the failure seems not to happen anymore.

We may continue with the Boolean for now, I guess though this was not a
problem in an earlier Flink version, possible Kryo change?

Cheers,

Bruno

On Wed, 17 Oct 2018 at 15:40 aitozi  wrote:

> Hi,Bruno Aranda
>
> Could you provide an complete example to reproduce the exception?
>
> Thanks,
> Aitozi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Rich variant for Async IO in Scala

2018-11-08 Thread Bruno Aranda
Hi,

I see that the AsyncFunction for Scala does not seem to have a rich variant
like the Java one. Is there a particular reason for this? Is there any
workaround?

Thanks!

Bruno


Re: Rich variant for Async IO in Scala

2018-11-13 Thread Bruno Aranda
Hi,

Tried again last night. The problem is that I was trying to
use org.apache.flink.streaming.api.*scala*.AsyncDataStream, and that won't
compile against the RichAsyncFunction. I could change it to use
org.apache.flink.streaming.api.*datastream*.AsyncDataStream instead, but it
is not as elegant as it requires the result to be a Java collection. But it
works.

Thanks!

Bruno

On Mon, 12 Nov 2018 at 16:43 Timo Walther  wrote:

> Hi Bruno,
>
> `org.apache.flink.streaming.api.functions.async.RichAsyncFunction`
> should also work for the Scala API. `RichMapFunction` or
> `RichFilterFunction` are also shared between both APIs.
>
> Is there anything that blocks you from using it?
>
> Regards,
> Timo
>
> Am 09.11.18 um 01:38 schrieb Bruno Aranda:
> > Hi,
> >
> > I see that the AsyncFunction for Scala does not seem to have a rich
> > variant like the Java one. Is there a particular reason for this? Is
> > there any workaround?
> >
> > Thanks!
> >
> > Bruno
>
>
>


Subtask much slower than the others when creating checkpoints

2019-01-08 Thread Bruno Aranda
Hi,

We are using Flink 1.6.1 at the moment and we have a streaming job
configured to create a checkpoint every 10 seconds. Looking at the
checkpointing times in the UI, we can see that one subtask is much slower
creating the endpoint, at least in its "End to End Duration", and seems
caused by a longer "Checkpoint Duration (Async)".

For instance, in the attach screenshot, while most of the subtasks take
half a second, one (and it is always one) takes 2 seconds.

But we have worse problems. We have seen cases where the checkpoint times
out for one tasks, while most take one second, the outlier takes more than
5 minutes (which is the max time we allow for a checkpoint). This can
happen if there is back pressure. We only allow one checkpoint at a time as
well.

Why could one subtask take more time? This jobs read from kafka partitions
and hash by key, and we don't see any major data skew between the
partitions. Does one partition do more work?

We do have a cluster of 20 machines, in EMR, with TMs that have multiple
slots (in legacy mode).

Is this something that could have been fixed in a more recent version?

Thanks for any insight!

Bruno


Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Bruno Aranda
Hi,

Just an update from our side. We couldn't find anything specific in the
logs and the problem is not easy reproducible. This week, the system is
running fine, which makes me suspicious as well of some resourcing issue.
But so far, we haven't been able to find the reason though we have
discarded a few things. We consume from Kafka, and the load was properly
balanced. We couldn't find a relationship between rate and the task manager
checkpoint being slower. The problem could happen even at the times of day
where we get less messages. After a flink session restart (using AWS EMR),
another TM in a different machine could have been the one with the longer
checkpoints.

We are now trying to reproduce the problem in a different cluster by trying
to send the data that was crossing the system while we saw the problems and
see if we can identify something specific to it. But our data is pretty
uniform, so not sure, and so far we have only seen this problem in our Prod
environment and not when running stress tests which much higher load.

Will come back if we figure anything out.

Thanks,

Bruno

On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  wrote:

> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
> would be that the respective tasks are overloaded or there is some resource
> congestion (network, disk, etc).
>
> You should see in the web UI the number of incoming and outgoing events.
> It would be good to check that the events are similarly sized and can be
> computed in roughly the same time.
>
> Cheers,
> Till
>
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
> wrote:
>
>> I have the same problem, even more impactful. Some subtasks stall forever
>> quite consistently.
>> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
>> help.
>> The Backend doesn't seem to make any difference, I've tried Memory, FS
>> and RocksDB back ends but nothing changes. I've also tried to change the
>> medium, local spinning disk, SAN or mounted fs but nothing helps.
>> Parallelism is the only thing which mitigates the stalling, when I set 1
>> everything works but if I increase the number of parallelism then
>> everything degrades, 10 makes it very slow 30 freezes it.
>> It's always one of two subtasks, most of them does the checkpoint in few
>> milliseconds but there is always at least one which stalls for minutes
>> until it times out. The Alignment seems to be a problem.
>> I've been wondering whether some Kafka partitions where empty but there
>> is not much data skew and the keyBy uses the same key strategy as the Kafka
>> partitions, I've tried to use murmur2 for hashing but it didn't help either.
>> The subtask that seems causing problems seems to be a CoProcessFunction.
>> I am going to debug Flink but since I'm relatively new to it, it might
>> take a while so any help will be appreciated.
>>
>> Pasquale
>>
>>
>> From: Till Rohrmann 
>> Sent: 08 January 2019 17:35
>> To: Bruno Aranda 
>> Cc: user 
>> Subject: Re: Subtask much slower than the others when creating checkpoints
>>
>> Hi Bruno,
>>
>> there are multiple reasons wh= one of the subtasks can take longer for
>> checkpointing. It looks as if the=e is not much data skew since the state
>> sizes are relatively equal. It als= looks as if the individual tasks all
>> start at the same time with the chec=pointing which indicates that there
>> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
>> back-pressured). This narrows the pro=lem cause down to the asynchronous
>> write operation. One potential problem =ould be if the external system to
>> which you write your checkpoint data has=some kind of I/O limit/quota.
>> Maybe the sum of write accesses deplete the =aximum quota you have. You
>> could try whether running the job with a lower =arallelism solves the
>> problems.
>>
>> For further debug=ing it could be helpful to get access to the logs of
>> the JobManager and th= TaskManagers on DEBUG log level. It could also be
>> helpful to learn which =tate backend you are using.
>>
>> Cheers,
>> Til=
>>
>> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda <mailto:bara...@apache.org>
>> wrote:
>> Hi,
>>
>> We are using Flink =.6.1 at the moment and we have a streaming job
>> configured to create a chec=point every 10 seconds. Looking at the
>> checkpointing times in the UI, we c=n see that one subtask is much slower
>> creating the endpoint, at least in i=s "End to End Duration", and seems
>> caused by a longer "Chec=point Dur

Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Bruno Aranda
Hi Stefan,

Thanks for your suggestion. As you may see from the original screenshot,
the actual state is small, and even smaller than other some of the other
subtasks. We are consuming from a Kafka topic with 600 partitions, with
parallelism set to around 20. Our metrics show that all the subtasks are
roughly getting an almost equal share of the load. In addition to the
balanced consumption, the first operation in that particular is a keyBy, so
it hashes and shuffling the data, producing balanced loads that are
balanced too according to the metrics. The second operation is the one
suffering from the issue, and it just transforms the data and puts it to
another kafka topic.

Thanks,

Bruno

On Tue, 15 Jan 2019 at 11:03, Stefan Richter 
wrote:

> Hi,
>
> I have seen a few cases where for certain jobs a small imbalance in the
> state partition assignment did cascade into a larger imbalance of the job.
> If your max parallelism mod parallelism is not 0, it means that some tasks
> have one partition more than others. Again, depending on how much
> partitions you have assigned to each task, in the extremest case when every
> task has 1 key group, except for one that has 2, imbalance can be 100%.
> Maybe you could check for that, especially if you were running at a
> different parallelism in production and stress testing. This would also
> explain why the any checkpoint duration is longer for a task, because it
> would have much more state - assuming that the load is kind of balanced
> between partitions.
>
> Best,
> Stefan
>
> On 15. Jan 2019, at 11:42, Bruno Aranda  wrote:
>
> Hi,
>
> Just an update from our side. We couldn't find anything specific in the
> logs and the problem is not easy reproducible. This week, the system is
> running fine, which makes me suspicious as well of some resourcing issue.
> But so far, we haven't been able to find the reason though we have
> discarded a few things. We consume from Kafka, and the load was properly
> balanced. We couldn't find a relationship between rate and the task manager
> checkpoint being slower. The problem could happen even at the times of day
> where we get less messages. After a flink session restart (using AWS EMR),
> another TM in a different machine could have been the one with the longer
> checkpoints.
>
> We are now trying to reproduce the problem in a different cluster by
> trying to send the data that was crossing the system while we saw the
> problems and see if we can identify something specific to it. But our data
> is pretty uniform, so not sure, and so far we have only seen this problem
> in our Prod environment and not when running stress tests which much higher
> load.
>
> Will come back if we figure anything out.
>
> Thanks,
>
> Bruno
>
> On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  wrote:
>
>> Same here Pasquale, the logs on DEBUG log level could be helpful. My
>> guess would be that the respective tasks are overloaded or there is some
>> resource congestion (network, disk, etc).
>>
>> You should see in the web UI the number of incoming and outgoing events.
>> It would be good to check that the events are similarly sized and can be
>> computed in roughly the same time.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
>> wrote:
>>
>>> I have the same problem, even more impactful. Some subtasks stall
>>> forever quite consistently.
>>> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it
>>> didn't help.
>>> The Backend doesn't seem to make any difference, I've tried Memory, FS
>>> and RocksDB back ends but nothing changes. I've also tried to change the
>>> medium, local spinning disk, SAN or mounted fs but nothing helps.
>>> Parallelism is the only thing which mitigates the stalling, when I set 1
>>> everything works but if I increase the number of parallelism then
>>> everything degrades, 10 makes it very slow 30 freezes it.
>>> It's always one of two subtasks, most of them does the checkpoint in few
>>> milliseconds but there is always at least one which stalls for minutes
>>> until it times out. The Alignment seems to be a problem.
>>> I've been wondering whether some Kafka partitions where empty but there
>>> is not much data skew and the keyBy uses the same key strategy as the Kafka
>>> partitions, I've tried to use murmur2 for hashing but it didn't help either.
>>> The subtask that seems causing problems seems to be a CoProcessFunction.
>>> I am going to debug Flink but since I'm relatively new to it, it might
>>> take a while so any help wi

Re: Flink and S3 AWS keys rotation

2019-02-07 Thread Bruno Aranda
Hi,

You can give specific IAM instance roles to the instances running Flink.
This way you never expose access keys anywhere. As the docs say, that is
the recommended way (and not just for Flink, but for any service you want
to use, never set it up with AWS credentials in config). IAM will
transparently deal with the security, and you can be extremely restrictive
on what policies you attach to the instance roles.

Cheers,

Bruno

On Thu, 7 Feb 2019 at 13:38, Kostas Kloudas  wrote:

> Hi Antonio,
>
> I am  cc'ing Till who may have something to say on this.
>
> Cheers,
> Kostas
>
> On Thu, Feb 7, 2019 at 1:32 PM Antonio Verardi  wrote:
>
>> Hi there,
>>
>> I'm trying out to run Flink on Kubernetes and I run into a problem with
>> the way Flink sets up AWS credentials to talk with S3 and the way we manage
>> AWS secrets in my company.
>>
>> To give permissions to Flink I am using AWS keys embedded in flink.conf,
>> as per
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#configure-access-credentials.
>> The problem there is that we rotate daily our AWS keys in order to mitigate
>> any eventual leak of keys. In order to make Flink pick up the new keys I
>> understand I have to restart it, but that means downtime, especially for
>> the jobs which have a large state to save.
>>
>> I know that in Kubernetes land there are these two projects,
>> https://github.com/uswitch/kiam and https://github.com/jtblin/kube2iam
>> , that make possible to associate
>> IAM policies to pods/containers. But they are not part of the "official"
>> Kubernetes software, which kinda surprises me.
>>
>> Did anyone run into a similar problem? If so, how did you solve it?
>>
>> Cheers,
>> Antonio
>>
>


Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hi,

I am having the same issue, but it is related to what Kostas is pointing
out. I was trying to stream to the "s3" scheme and not "hdfs", and then
getting that exception.

I have realised that somehow I need to reach the S3RecoverableWriter, and
found out it is in a difference library "flink-s3-fs-hadoop". Still trying
to figure out how to make it work, though. I am aiming for code such as:

  val sink = StreamingFileSink
  .forBulkFormat(new Path("s3://"), ...)
  .build()

Cheers,

Bruno

On Tue, 26 Feb 2019 at 14:59, Kostas Kloudas  wrote:

> Hi Kevin,
>
> I cannot find anything obviously wrong from what you describe.
> Just to eliminate the obvious, you are specifying "hdfs" as the scheme for
> your file path, right?
>
> Cheers,
> Kostas
>
> On Tue, Feb 26, 2019 at 3:35 PM Till Rohrmann 
> wrote:
>
>> Hmm good question, I've pulled in Kostas who worked on the
>> StreamingFileSink. He might be able to tell you more in case that there is
>> some special behaviour wrt the Hadoop file systems.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 3:29 PM kb  wrote:
>>
>>> Hi Till,
>>>
>>> The only potential issue in the path I see is
>>> `/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.29.0.jar`. I double
>>> checked my pom, the project is Hadoop-free. The JM log also shows `INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
>>> version: 2.8.5-amzn-1`.
>>>
>>> Best,
>>> Kevin
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hey,

Got it working, basically you need to add the flink-s3-fs-hadoop-1.7.2.jar
libraries from the /opt folder of the flink distribution into the
/usr/lib/flink/lib. That has done the trick for me.

Cheers,

Bruno

On Tue, 26 Feb 2019 at 16:28, kb  wrote:

> Hi Bruno,
>
> Thanks for verifying. We are aiming for the same.
>
> Best,
> Kevin
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hi,

That Jar must exist for all the 1.7 versions, but I was replacing the libs
for the Flink provided by the AWS EMR (1.7.0) by the more recent ones. But
you could download the 1.7.0 distribution and copy the
flink-s3-fs-hadoop-1.7.0.jar from there into the /usr/lib/flink/lib folder.

But knowing there is a more recent 1.7 release out there, I prefer
replacing the one in the EMR by this one. To do so, we basically replace
the libs in the /usr/lib/flink/lib folder by the ones from the most recent
distribution.

Cheers,

Bruno

On Tue, 26 Feb 2019 at 21:37, kb  wrote:

> Hi,
>
> So 1.7.2 jar has the fix?
>
> Thanks
> Kevin
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-19 Thread Bruno Aranda
Hi,

This is causing serious instability and data loss in our production
environment. Any help figuring out what's going on here would be really
appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2
(running on AWS EMR). The road to the upgrade was fairly rocky, but we felt
like it was working sufficiently well in our pre-production environments
that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a
day. There doesn't seem to be any pattern to when this happens - it doesn't
coincide with an increase in the data flowing through the system, nor is it
at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail
to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for
2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of
JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for
most jobs means it starts from the end of the source kafka topic. This
means whenever this surprise jobmanager restart happens, we have a ticking
clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted
(they have a 30 second delay strategy), the job manager restarts and does
not recover them? In any case, we have never seen so many job failures and
JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink
over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found
https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible
this might be related?

Best regards,

Bruno


Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-21 Thread Bruno Aranda
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they
are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Releasing
slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched
from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 2, slots allocated: 0
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able
to find slots for the tasks in a reasonable time frame? Weirdly, we are
running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we
always try to keep an extra TM worth of free slots just in case. Looking at
the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are
available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed
settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin  wrote:

> Hi Bruno,
>
> could you also share the job master logs?
>
> Thanks,
> Andrey
>
> On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda  wrote:
>
>> Hi,
>>
>> This is causing serious instability and data loss in our production
>> environment. Any help figuring out what's going on here would be really
>> appreciated.
>>
>> We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2
>> (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt
>> like it was working sufficiently well in our pre-production environments
>> that we rolled it out to prod.
>>
>> However we're now seeing the jobmanager crash spontaneously several times
>> a day. There doesn't seem to be any pattern to when this happens - it
>> doesn't coincide with an increase in the data flowing through the system,
>> nor is it at the same time of day.
>>
>> The big problem is that when it recovers, sometimes a lot of the jobs
>> fail to resume with the following exception:
>>
>> org.apache.flink.util.FlinkException: JobManager responsible for
>> 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
>> //...
>> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
>> JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
>> ... 16 more
>>
>> Starting them manually afterwards doesn't resume from checkpoint, which
>> for most jobs means it starts from the end of the source kafka topic. This
>> means whenever this surprise jobmanager restart happens, we have a ticking
>> clock during which we're losing data.
>>
>> We speculate that those jobs die first and while they wait to be
>> restarted (they have a 30 second delay strategy), the job manager restarts
>> and does not recover them? In any case, we have never seen so many job
>> failures and JM restarts with exactly the same EMR config.
>>
>> We've got some functionality we're building that uses the
>> StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal
>> option.
>>
>> Looking through the mailing list, we found
>> https://issues.apache.org/jira/browse/FLINK-11843 - does it seem
>> possible this might be related?
>>
>> Best regards,
>>
>> Bruno
>>
>


Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-21 Thread Bruno Aranda
Ok, here it goes:

https://transfer.sh/12qMre/jobmanager-debug.log

In an attempt to make it smaller, did remove the noisy "http wire" ones and
masked a couple of things. Not sure this covers everything you would like
to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann  wrote:

> Hi Bruno,
>
> could you upload the logs to https://transfer.sh/ or
> https://gist.github.com/ and then post a link. For further debugging this
> will be crucial. It would be really good if you could set the log level to
> DEBUG.
>
> Concerning the number of registered TMs, the new mode (not the legacy
> mode), no longer respects the `-n` setting when you start a yarn session.
> Instead it will dynamically start as many containers as you need to run the
> submitted jobs. That's why you don't see the spare TM and this is the
> expected behaviour.
>
> The community intends to add support for ranges of how many TMs must be
> active at any given time [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-11078
>
> Cheers,
> Till
>
> On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda  wrote:
>
>> Hi Andrey,
>>
>> Thanks for your response. I was trying to get the logs somewhere but they
>> are biggish (~4Mb). Do you suggest somewhere I could put them?
>>
>> In any case, I can see exceptions like this:
>>
>> 2019/03/18 10:11:50,763 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Releasing
>> slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
>> 2019/03/18 10:11:50,807 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched
>> from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 2, slots allocated: 0
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> ...
>>
>> It looks like a TM may crash, and then the JM. And then the JM is not
>> able to find slots for the tasks in a reasonable time frame? Weirdly, we
>> are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we
>> always try to keep an extra TM worth of free slots just in case. Looking at
>> the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are
>> available when we start the session in yarn.
>>
>> Any ideas? It is way less stable for us these days without having changed
>> settings much since we started using Flink around 1.2 some time back.
>>
>> Thanks,
>>
>> Bruno
>>
>>
>>
>> On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin 
>> wrote:
>>
>>> Hi Bruno,
>>>
>>> could you also share the job master logs?
>>>
>>> Thanks,
>>> Andrey
>>>
>>> On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is causing serious instability and data loss in our production
>>>> environment. Any help figuring out what's going on here would be really
>>>> appreciated.
>>>>
>>>> We recently updated our two EMR clusters from flink 1.6.1 to flink
>>>> 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but
>>>> we felt like it was working sufficiently well in our pre-production
>>>> environments that we rolled it out to prod.
>>>>
>>>> However we're now seeing the jobmanager crash spontaneously several
>>>> times a day. There doesn't seem to be any pattern to when this happens - it
>>>> doesn't coincide with an increase in the data flowing through the system,
>>>> nor is it at the same time of day.
>>>>
>>>> The big problem is that when it recovers, sometimes a lot of the jobs
>>>> fail to resume with the following exception:
>>>>
>>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>>> 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
>>>> at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
>>>> at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
>>>> at
>>>> org.apache.flink.runtime.taskexecutor.Ta

1.7.2 requires several attempts to start in AWS EMR's Yarn

2019-03-26 Thread Bruno Aranda
Hi,

I did write recently about our problems with 1.7.2 for which we still
haven't found a solution and the cluster is very unstable. I am trying to
point now to a different problem that maybe it is related somehow and we
don't understand.

When we restart a Flink Session in Yarn, we see it takes a few attempts in
order for the container with the JM to be stable. The following Gist
contains the logs from the 4 attempts before a 5th successful one:

https://gist.github.com/siliconcat/3f6b7869e4796151a6bf23ed5342f516

We fail to see why the JM fails. In the first case, I can see a SIGTERM 15,
so I assume it is the cluster manager killing it or something, but I am not
sure what happens in the other cases, or why would the manager kill that
container. We run 38 streaming jobs and we are using the same resources
that we were using before with Flink 1.6 (for which we were using legacy
mode).

Thanks for any insights. We are losing a lot of hair with 1.7.2...

Cheers,

Bruno


StreamingFileSink seems to be overwriting existing part files

2019-03-29 Thread Bruno Aranda
Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular)
was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it
through the jobmanager API. After confirming that a number of "part-0-x"
files existed in S3 at the expected path, we then started the job again
using the same invocation of the CLI "flink run..." command that was
originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which
gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where
to resume, but the fact that it overwrites the existing files (as it starts
to write to part-0.0 again) is surprising. One would expect that it finds
the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the
presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming
versions of the sink.

Best regards,

Bruno


Re: StreamingFileSink seems to be overwriting existing part files

2019-03-29 Thread Bruno Aranda
Hi Kostas,

Put that way, sounds fair enough. Many thanks for the clarification,

Cheers,

Bruno

On Fri, 29 Mar 2019 at 15:32, Kostas Kloudas  wrote:

> Hi Bruno,
>
> This is the expected behaviour as the job starts "fresh", given that you
> did not specify any savepoint/checkpoint to start from.
>
> As for the note that "One would expect that it finds the last part and
> gets the next free number?",
> I am not sure how this can be achieved safely and efficiently in an
> eventually consistent object store like s3.
> This is actually the reason why, contrary to the BucketingSink, the
> StreamingFileSink relies on Flink's own state to determine the "next" part
> counter.
>
> Cheers,
> Kostas
>
> On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda  wrote:
>
>> Hi,
>>
>> One of the main reasons we moved to version 1.7 (and 1.7.2 in particular)
>> was because of the possibility of using a StreamingFileSink with S3.
>>
>> We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
>> bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
>> cluster in AWS.
>>
>> We ran the job and after a few hours of activity, manually cancelled it
>> through the jobmanager API. After confirming that a number of "part-0-x"
>> files existed in S3 at the expected path, we then started the job again
>> using the same invocation of the CLI "flink run..." command that was
>> originally used to start it.
>>
>> It started writing data to S3 again, starting afresh from "part-0-0",
>> which gradually overwrote the existing data.
>>
>> I can understand not having used a checkpoint gives no indication on
>> where to resume, but the fact that it overwrites the existing files (as it
>> starts to write to part-0.0 again) is surprising. One would expect that it
>> finds the last part and gets the next free number?
>>
>> We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have
>> the presto version on the classpath.
>>
>> Is this its expected behaviour? We have not seen this in the non
>> streaming versions of the sink.
>>
>> Best regards,
>>
>> Bruno
>>
>


Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-04-08 Thread Bruno Aranda
Hi Till,

Many thanks for your reply and don't worry. We understand this is tricky
and you are busy.

We have been experiencing some issues, and a couple of them have been
addressed, so the logs probably were not relevant anymore.

About losing jobs on restart -> it seems that YARN was killing the
container for the master due to it not passing the liveness probe. Since
Flink 1.1 or something we had been using very stringent liveness probe
timeouts in Yarn to detect very fast when a node in the cluster was going
out of service. This timeout (30 seconds) was probably killing the job
manager before it was able to recover the ~40 streaming jobs that we run in
session mode. I wonder why we had not seen that in 1.6, though, probably
because of the legacy mode?

Extremely high unstability -> that was caused because we were running in
DEBUG mode to capture logs and the sheer number of them (especially coming
from AsyncFunctions) did cause the disks to fill and YARN to decomission
the nodes. We do process many thousands of messages per second in some of
our jobs.

We still have a few instances of Job Managers losing leadership every few
hours (all of them in the cluster). Another of our jobs restarts more
often, but the "Exceptions" tab in the UI for the job just tells us that
"The assigned slot XXX was removed". It would be helpful to see why it was
removed, though.

I am currently looking at those. But the logs don't tell me much (and I
cannot run them in this environment with such a low level anymore). There
is only one thing at ERROR level for when the more unstable job restarts:

java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours),
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:376),

at akka.actor.Actor.aroundReceive(Actor.scala:502),
at akka.actor.Actor.aroundReceive$(Actor.scala:500),
at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203),
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526),
at akka.actor.ActorCell.invoke(ActorCell.scala:495),
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257),
at akka.dispatch.Mailbox.run(Mailbox.scala:224),
at akka.dispatch.Mailbox.exec(Mailbox.scala:234),
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

And for the other, all the job managers losing leadership, just some
warnings about the association with the remote system failing, ie:

Remote connection to [null] failed with java.net.ConnectException:
Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/
10.10.56.193:43041
Association with remote system
[akka.tcp://fl...@ip-10-10-56-193.eu-west-1.compute.internal:43041] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://fl...@ip-10-10-56-193.eu-west-1.compute.internal:43041]] Caused
by: [Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/
10.10.56.193:43041]

over and over again...

Thanks for any insight.

Bruno

On Mon, 8 Apr 2019 at 10:45, Till Rohrmann  wrote:

> Hi Bruno,
>
> sorry for getting back to you so late. I just tried to access your logs to
> investigate the problem but transfer.sh tells me that they are no longer
> there. Could you maybe re-upload them or directly send them to my mail
> address. Sorry for not taking faster a look at your problem and the
> inconveniences with the upload.
>
> Cheers,
> Till
>
> On Thu, Mar 21, 2019 at 4:30 PM Bruno Aranda  wrote:
>
>> Ok, here it goes:
>>
>> https://transfer.sh/12qMre/jobmanager-debug.log
>>
>> In an attempt to make it smaller, did remove the noisy "http wire" ones
>> and masked a couple of things. Not sure this covers everything you would
>> like to see.
>>
>> Thanks!
>>
>> Bruno
>>
>> On Thu, 21 Mar 2019 at 15:24, Till Rohrmann  wrote:
>>
>>> Hi Bruno,
>>>
>>> could you upload the logs to https://transfer.sh/ or
>>> https://gist.github.com/ and then post a link. For further debugging
>>> this will be crucial. It would be really good if you could set the log
>>> level to DEBUG.
>>>
>>> Concerning the number of registered TMs, the new mode (not the legacy
>>> mode), no longer respects the `-n` setting when you start a yarn session.
>>> Instead it will dynamically start as many containers as you need to run the
>>> submitted jobs. That's why you don't see the spare TM and this is the
>>> expected behaviour.
>>>
>>> The community intends to add support for ranges of how ma

Re: Adding metadata to the jar

2019-04-08 Thread Bruno Aranda
Hi Avi,

Don't know if there are better ways, but we store the version of the job
running and other metadata as part of the "User configuration" of the job,
so it shows in the UI when you go to the job Configuration tab inside the
job. To do so, when we create the job:

val buildInfo = new Configuration()
buildInfo.setString("version", "0.1.0")


val env = StreamExecutionEnvironment.*getExecutionEnvironment
*env.getConfig.setGlobalJobParameters(buildInfo)
...

It helps us to have a convenient way of knowing what version of the jobs
are running, when they were built, etc...

Cheers,

Bruno


On Mon, 8 Apr 2019 at 18:04, Avi Levi  wrote:

> Is there a way to add some metadata to the jar and see it on dashboard ? I
> couldn't find a way to do so but I think it very useful.
> Consider that you want to know which version is actually running in the
> job manager (not just which jar is uploaded which is not necessary being
> running at the moment ), AFAIK by looking at dashboard there is no way to
> know which jar / version is actually executing. Well as a workaround you
> can add something to the job name but this is a limited option, what if one
> wants to add more info programatically?
>
> Is there a way to do it ?
>
> BR
> Avi
>


Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-04-09 Thread Bruno Aranda
Thanks Till, I will start separate threads for the two issues we are
experiencing.

Cheers,

Bruno

On Mon, 8 Apr 2019 at 15:27, Till Rohrmann  wrote:

> Hi Bruno,
>
> first of all good to hear that you could resolve some of the problems.
>
> Slots get removed if a TaskManager gets unregistered from the SlotPool.
> This usually happens if a TaskManager closes its connection or its
> heartbeat with the ResourceManager times out. So you could look for
> messages like "The heartbeat of TaskManager with id ... timed out".
>
> If JobManagers lose its leadership, it could also have something to do
> with the ZooKeeper cluster and the configuration of Flink's ZooKeeper
> client [1] or your network in general.
>
> For the warnings you see, it's hard to say without the full picture. Could
> `10.10.56.193:43041` be a TaskManager which just died and, hence, cannot
> be connected to anymore?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#zookeeper-based-ha-mode
>
> Cheers,
> Till
>
> On Mon, Apr 8, 2019 at 12:33 PM Bruno Aranda  wrote:
>
>> Hi Till,
>>
>> Many thanks for your reply and don't worry. We understand this is tricky
>> and you are busy.
>>
>> We have been experiencing some issues, and a couple of them have been
>> addressed, so the logs probably were not relevant anymore.
>>
>> About losing jobs on restart -> it seems that YARN was killing the
>> container for the master due to it not passing the liveness probe. Since
>> Flink 1.1 or something we had been using very stringent liveness probe
>> timeouts in Yarn to detect very fast when a node in the cluster was going
>> out of service. This timeout (30 seconds) was probably killing the job
>> manager before it was able to recover the ~40 streaming jobs that we run in
>> session mode. I wonder why we had not seen that in 1.6, though, probably
>> because of the legacy mode?
>>
>> Extremely high unstability -> that was caused because we were running in
>> DEBUG mode to capture logs and the sheer number of them (especially coming
>> from AsyncFunctions) did cause the disks to fill and YARN to decomission
>> the nodes. We do process many thousands of messages per second in some of
>> our jobs.
>>
>> We still have a few instances of Job Managers losing leadership every few
>> hours (all of them in the cluster). Another of our jobs restarts more
>> often, but the "Exceptions" tab in the UI for the job just tells us that
>> "The assigned slot XXX was removed". It would be helpful to see why it was
>> removed, though.
>>
>> I am currently looking at those. But the logs don't tell me much (and I
>> cannot run them in this environment with such a low level anymore). There
>> is only one thing at ERROR level for when the more unstable job restarts:
>>
>> java.util.concurrent.TimeoutException: Remote system has been silent for
>> too long. (more than 48.0 hours),
>> at
>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:376),
>>
>> at akka.actor.Actor.aroundReceive(Actor.scala:502),
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500),
>> at
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203),
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526),
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495),
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257),
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224),
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234),
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),
>>
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>
>> And for the other, all the job managers losing leadership, just some
>> warnings about the association with the remote system failing, ie:
>>
>> Remote connection to [null] failed with java.net.ConnectException:
>> Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/
>> 10.10.56.193:43041
>> Association with remote system
>> [akka.tcp://fl...@ip-10-10-56-193.eu-west-1.compute.internal:43041] has
>> failed, address is now gated for [50] ms. Reason: [Association failed with
>> [akka.tcp://fl...@ip-10-10-56-193.eu-west-1.compute.internal:43041]]
>> Caused by: [Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/
>> 10.10.56.193:43041]
>>

Re: Rolling sink parquet/Avro output

2017-01-18 Thread Bruno Aranda
Sorry, something went wrong with the code for the Writer. Here it is again:

import org.apache.avro.Schema
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

@SerialVersionUID(1L)
class MyAvroParquetWriter[T](schema: String) extends Writer[T] {

  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
  .withSchema(new Schema.Parser().parse(schema))
  .withCompressionCodec(CompressionCodecName.SNAPPY)
  .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = writer.getDataSize

}

Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
"1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno

On Wed, 18 Jan 2017 at 09:09 elmosca  wrote:

> Hi Biswajit,
>
> We use the following Writer for Parquet using Avro conversion (using
> Scala):
>
>
>
> Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
> "1.8.1". We use this writer in a rolling sink and seems fine so far.
>
> Cheers,
>
> Bruno
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11127.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Cannot cancel job with savepoint due to timeout

2017-01-31 Thread Bruno Aranda
Hi there,

I am trying to cancel a job and create a savepoint (ie flink cancel -s) but
it takes more than a minute to do that and then it fails due to the
timeout. However, it seems that the job will be cancelled successfully and
the savepoint made, but I can only see that through the dasboard.

Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default
savepoint directory.


 The program finished with the following exception:

java.util.concurrent.TimeoutException: Futures timed out after [6
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)

Is there any way to configure this timeout? So we can depend on the outcome
of this execution for scripts, etc.

Thanks!

Bruno


Re: Cannot cancel job with savepoint due to timeout

2017-02-01 Thread Bruno Aranda
Maybe, though it could be good to be able to override in the command line
somehow, though I guess I could just change the flink config.

Many thanks Yuri,

Bruno

On Wed, 1 Feb 2017 at 07:40 Yury Ruchin  wrote:

> Hi Bruno,
>
> From the code I conclude that "akka.client.timeout" setting is what
> affects this. It defaults to 60 seconds.
>
> I'm not sure why this setting is not documented though as well as many
> other "akka.*" settings - maybe there are some good reasons behind.
>
> Regards,
> Yury
>
> 2017-01-31 17:47 GMT+03:00 Bruno Aranda :
>
> Hi there,
>
> I am trying to cancel a job and create a savepoint (ie flink cancel -s)
> but it takes more than a minute to do that and then it fails due to the
> timeout. However, it seems that the job will be cancelled successfully and
> the savepoint made, but I can only see that through the dasboard.
>
> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default
> savepoint directory.
>
> 
>  The program finished with the following exception:
>
> java.util.concurrent.TimeoutException: Futures timed out after [6
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
>
> Is there any way to configure this timeout? So we can depend on the
> outcome of this execution for scripts, etc.
>
> Thanks!
>
> Bruno
>
>
>


Re: Can't run flink on yarn on version 1.2.0

2017-02-17 Thread Bruno Aranda
Hi Howard,

We run Flink 1.2 in Yarn without issues. Sorry I don't have any specific
solution, but are you sure you don't have some sort of Flink mix? In your
logs I can see:

The configuration directory ('/home/software/flink-1.1.4/conf') contains
both LOG4J and Logback configuration files. Please delete or rename one of
them.

Where it mentions 1.1.4 in the folder for the conf dir instead of 1.2.

Cheers,

Bruno

On Fri, 17 Feb 2017 at 08:50 Howard,Li(vip.com) 
wrote:

> Hi,
>
>  I’m trying to run flink on yarn by using command: bin/flink run
> -m yarn-cluster -yn 2 -ys 4 ./examples/batch/WordCount.jar
>
>  But I got the following error:
>
>
>
> 2017-02-17 15:52:40,746 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for
> the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2017-02-17 15:52:40,746 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for
> the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Using
> values:
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> TaskManager count = 2
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> JobManager memory = 1024
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> TaskManager memory = 1024
>
> 2017-02-17 15:52:40,796 INFO
> org.apache.hadoop.yarn.client.RMProxy - Connecting
> to ResourceManager at /0.0.0.0:8032
>
> 2017-02-17 15:52:41,680 WARN
> org.apache.flink.yarn.YarnClusterDescriptor   - The
> configuration directory ('/home/software/flink-1.1.4/conf') contains both
> LOG4J and Logback configuration files. Please delete or rename one of them.
>
> 2017-02-17 15:52:41,702 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/conf/logback.xml to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/logback.xml
>
> 2017-02-17 15:52:42,025 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/lib to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/lib
>
> 2017-02-17 15:52:42,695 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/conf/log4j.properties to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/log4j.properties
>
> 2017-02-17 15:52:42,722 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/lib/flink-dist_2.10-1.1.4.jar to
> hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-dist_2.10-1.1.4.jar
>
> 2017-02-17 15:52:43,346 INFO
> org.apache.flink.yarn.Utils   - Copying
> from /home/software/flink-1.1.4/conf/flink-conf.yaml to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-conf.yaml
>
> 2017-02-17 15:52:43,386 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Submitting
> application master application_1487247313588_0017
>
> 2017-02-17 15:52:43,425 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1487247313588_0017
>
> 2017-02-17 15:52:43,425 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Waiting for
> the cluster to be allocated
>
> 2017-02-17 15:52:43,427 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
>
> 2017-02-17 15:52:48,471 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - YARN
> application has been deployed successfully.
>
> Cluster started: Yarn cluster with application id
> application_1487247313588_0017
>
> Using address 10.199.202.162:43809 to connect to JobManager.
>
> JobManager web interface address
> http://vip-rc-ucsww.vclound.com:8088/proxy/application_1487247313588_0017/
>
> Using the parallelism provided by the remote cluster (8). To use another
> parallelism, set it at the ./bin/flink client.
>
> Starting execution of program
>
> 2017-02-17 15:52:49,278 INFO
> org.apache.flink.yarn.YarnClusterClient   - Starting
> program in interactive mode
>
> Executing WordCount example with default input data set.
>
> Use --input to specify file input.
>
> Printing result to stdout. Use --output to specify output path.
>
> 2017-02-17 15:52:49,609 INFO
> org.apache.flink.yarn.YarnClusterClient   - Waiting
> until all TaskManagers have connected
>
> Waiting until all TaskManagers have connected
>

Any good ideas for online/offline detection of devices that send events?

2017-03-03 Thread Bruno Aranda
Hi all,

We are trying to write an online/offline detector for devices that keep
streaming data through Flink. We know how often roughly to expect events
from those devices and we want to be able to detect when any of them stops
(goes offline) or starts again (comes back online) sending events through
the pipeline. For instance, if 5 minutes have passed since the last event
of a device, we would fire an event to indicate that the device is offline.

The data from the devices comes through Kafka, with their own event time.
The devices events are in order in the partitions and each devices goes to
a specific partition, so in theory, we should not have out of order when
looking at one partition.

We are assuming a good way to do this is by using sliding windows that are
big enough, so we can see the relevant gap before/after the events for each
specific device.

We were wondering if there are other ideas on how to solve this.

Many thanks!

Bruno


Re: Any good ideas for online/offline detection of devices that send events?

2017-03-07 Thread Bruno Aranda
Hi Gordon,

Many thanks for your helpful ideas. We tried yesterday the CEP approach,
but could not figure it out. The ProcessFunction one looks more promising,
and we are investigating it, though we are fighting with some issues
related to the event time, where we cannot see so far the timer triggered
at the right event time. We are using ascending timestamps, but at the
moment we see the timers fired when it is too late. Investigating more.

Thanks,

Bruno

On Tue, 7 Mar 2017 at 07:49 Tzu-Li (Gordon) Tai  wrote:

> Some more input:
>
> Right now, you can also use the `ProcessFunction` [1] available in Flink
> 1.2 to simulate state TTL.
> The `ProcessFunction` should allow you to keep device state and simulate
> the online / offline detection by registering processing timers. In the
> `onTimer` callback, you can emit the “offline” marker event downstream, and
> in the `processElement` method, you can emit the “online” marker event if
> the case is the device has sent an event after it was determined to be
> offline.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>
>
> On March 6, 2017 at 9:40:28 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org)
> wrote:
>
> Hi Bruno!
>
> The Flink CEP library also seems like an option you can look into to see
> if it can easily realize what you have in mind.
>
> Basically, the pattern you are detecting is a timeout of 5 minutes after
> the last event. Once that pattern is detected, you emit a “device offline”
> event downstream.
> With this, you can also extend the pattern output stream to detect whether
> a device has became online again.
>
> Here are some materials for you to take a look at Flink CEP:
> 1. http://flink.apache.org/news/2016/04/06/cep-monitoring.html
> 2.
> https://www.slideshare.net/FlinkForward/fabian-huesketill-rohrmann-declarative-stream-processing-with-streamsql-and-cep?qid=3c13eb7d-ed39-4eae-9b74-a6c94e8b08a3&v=&b=&from_search=4
>
> The CEP parts in the slides in 2. also provides some good examples of
> timeout detection using CEP.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> On March 4, 2017 at 1:27:51 AM, Bruno Aranda (bara...@apache.org) wrote:
>
> Hi all,
>
> We are trying to write an online/offline detector for devices that keep
> streaming data through Flink. We know how often roughly to expect events
> from those devices and we want to be able to detect when any of them stops
> (goes offline) or starts again (comes back online) sending events through
> the pipeline. For instance, if 5 minutes have passed since the last event
> of a device, we would fire an event to indicate that the device is offline.
>
> The data from the devices comes through Kafka, with their own event time.
> The devices events are in order in the partitions and each devices goes to
> a specific partition, so in theory, we should not have out of order when
> looking at one partition.
>
> We are assuming a good way to do this is by using sliding windows that are
> big enough, so we can see the relevant gap before/after the events for each
> specific device.
>
> We were wondering if there are other ideas on how to solve this.
>
> Many thanks!
>
> Bruno
>
>


Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi,

We have seen something similar in Flink 1.2. We have an operation that
parses some JSON, and when it fails to parse it, we can see the
ClassNotFoundException for the relevant exception (in our case
JsResultException from the play-json library). The library is indeed in the
shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai  wrote:

> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>


Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)

On Wed, 8 Mar 2017, 21:41 Stephan Ewen,  wrote:

> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
>
> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda 
> wrote:
>
> Hi,
>
> We have seen something similar in Flink 1.2. We have an operation that
> parses some JSON, and when it fails to parse it, we can see the
> ClassNotFoundException for the relevant exception (in our case
> JsResultException from the play-json library). The library is indeed in the
> shaded JAR, otherwise we would not be able to parse the JSON.
>
> Cheers,
>
> Bruno
>
> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>
>
>


Re: Telling if a job has caught up with Kafka

2017-03-17 Thread Bruno Aranda
Hi,

We are interested on this too. So far we flag the records with timestamps
in different points of the pipeline and use metrics gauges to measure
latency between the different components, but would be good to know if
there is something more specific to Kafka that we can do out of the box in
Flink.

Cheers,

Bruno

On Fri, 17 Mar 2017 at 10:07 Florian König 
wrote:

> Hi,
>
> thank you Gyula for posting that question. I’d also be interested in how
> this could be done.
>
> You mentioned the dependency on the commit frequency. I’m using
> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka
> consumer a job's offsets as shown in the diagrams updated a lot more
> regularly than the checkpointing interval. With the 10 consumer a commit is
> only made after a successful checkpoint (or so it seems).
>
> Why is that so? The checkpoint contains the Kafka offset and would be able
> to start reading wherever it left off, regardless of any offset stored in
> Kafka or Zookeeper. Why is the offset not committed regularly,
> independently from the checkpointing? Or did I misconfigure anything?
>
> Thanks
> Florian
>
> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra :
> >
> > Hi All,
> >
> > I am wondering if anyone has some nice suggestions on what would be the
> simplest/best way of telling if a job is caught up with the Kafka input.
> > An alternative question would be how to tell if a job is caught up to
> another job reading from the same topic.
> >
> > The first thing that comes to my mind is looking at the offsets Flink
> commits to Kafka. However this will only work if every job uses a different
> group id and even then it is not very reliable depending on the commit
> frequency.
> >
> > The use case I am trying to solve is fault tolerant update of a job, by
> taking a savepoint for job1 starting job2 from the savepoint, waiting until
> it catches up and then killing job1.
> >
> > Thanks for your input!
> > Gyula
>
>
>


Re: Telling if a job has caught up with Kafka

2017-03-20 Thread Bruno Aranda
Hi,

Thanks! The proposal sounds very good to us too.

Bruno

On Sun, 19 Mar 2017 at 10:57 Florian König 
wrote:

> Thanks Gordon for the detailed explanation! That makes sense and explains
> the expected behaviour.
>
> The JIRA for the new metric also sounds very good. Can’t wait to have this
> in the Flink GUI (KafkaOffsetMonitor has some problems and stops working
> after 1-2 days, don’t know the reason yet).
>
> All the best,
> Florian
>
>
> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai  >:
> >
> > @Florian
> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
> now for the offset committing.
> >
> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
> etc. settings will be completely ignored and overwritten before used to
> instantiate the interval Kafka clients, hence committing will only happen
> on Flink checkpoints.
> >
> > In 0.8, this isn’t the case. Both automatic periodic committing and
> committing on checkpoints can take place. That’s perhaps why you’re
> observing the 0.8 consumer to be committing more frequently.
> >
> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
> interested, you can take a look at
> https://github.com/apache/flink/pull/3527.
> >
> > - Gordon
> >
> >
> > On March 17, 2017 at 6:07:38 PM, Florian König (
> florian.koe...@micardo.com) wrote:
> >
> >> Why is that so? The checkpoint contains the Kafka offset and would be
> able to start reading wherever it left off, regardless of any offset stored
> in Kafka or Zookeeper. Why is the offset not committed regularly,
> independently from the checkpointing? Or did I misconfigure anything?
>
>
>


Flink Graphire Reporter stops reporting via TCP if network issue

2017-05-05 Thread Bruno Aranda
Hi,

We are using the Graphite reporter from Flink 1.2.0 to send the metrics via
TCP. Due to our network configuration we cannot use UDP at the moment.

We have observed that if there is any problem with graphite our the
network, basically, the TCP connection times out or something, the metrics
reporter does not recover. This is easy to reproduce by blocking the port
we are sending the metrics using iptables. If we block the port for more
than a minute or so, the problem will happen. After the port is re-open,
Flink does not continue like before.

Is this a known issue? Googling shows some problems with the
metrics-graphite package that should have been solved already. We have
trying updated metrics-core/graphite to the latest with no success.

Any ideas?

Thanks!

Bruno


Re: Flink Graphire Reporter stops reporting via TCP if network issue

2017-05-05 Thread Bruno Aranda
Hi Chesnay,

Many thanks for your reply. At the end, we have decided to change the
infrastructure a bit and use StatD instead. This way, we don't need a
custom reporter and it works fine.

Thanks!

Bruno

On Fri, 5 May 2017 at 13:20 Chesnay Schepler  wrote:

> Hello,
>
> for Graphite, Flink uses the DropWizard metrics reporter. I don't know
> at the moment whether it supports any kind of reconnecting functionality.
>
> I'm not sure whether i understood you correctly; did you try upgrading
> the DropWizard metrics-core/metrics-graphite dependencies?
>
> If that didn't do the trick we could in fact implement this in Flink, it
> would be hack though. When an error occurs we can simply re-instantiate
> the reporter, but we would have to know how the reporter communicates
> the connection drop; i.e. whether it throws some exception or not.
>
> Could you check the log for a warning statements from the MetricRegistry?
>
> Regards,
> Chesnay
>
> On 05.05.2017 13:26, Bruno Aranda wrote:
> > Hi,
> >
> > We are using the Graphite reporter from Flink 1.2.0 to send the
> > metrics via TCP. Due to our network configuration we cannot use UDP at
> > the moment.
> >
> > We have observed that if there is any problem with graphite our the
> > network, basically, the TCP connection times out or something, the
> > metrics reporter does not recover. This is easy to reproduce by
> > blocking the port we are sending the metrics using iptables. If we
> > block the port for more than a minute or so, the problem will happen.
> > After the port is re-open, Flink does not continue like before.
> >
> > Is this a known issue? Googling shows some problems with the
> > metrics-graphite package that should have been solved already. We have
> > trying updated metrics-core/graphite to the latest with no success.
> >
> > Any ideas?
> >
> > Thanks!
> >
> > Bruno
>
>
>