Re: Flink 1.5 + resource elasticity resulting in overloaded workers

2018-12-21 Thread jelmer
@flink comitters

I get that you don't want to be aware of task managers but would it make
sense to change SlotManager (I briefly looked over the code and i think
that's the code that is responsible for this)  to it randomly selects slots
? or add an option to make it do this if this is not something you would
want to do by default ?

It's not going to be perfect but at least you don't end always end up in a
better spot than you end up now in a standalone setup


On Wed, 19 Dec 2018 at 20:26, jelmer  wrote:

> Hi, We recently upgraded to flink 1.6 and seem to be suffering from the
> issue described in this email
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-tt23364.html
>
> Our workers have 8 slots and some workers are fully loaded and as a
> consequence get to cope with heavy load during peak times. while other
> workers sit completely idle. and have 0 jobs assigned to its slots
>
> Is there any workaround for this . short of reducing the number of slots
> on a worker ?
>
> We need to have double the slots we need available in order to cope with
> availability zone maintenance . So if we where to reduce the number of
> slots we'd have to add new nodes that would then mostly sit idle
>
>
>
>


Flink 1.5 + resource elasticity resulting in overloaded workers

2018-12-19 Thread jelmer
Hi, We recently upgraded to flink 1.6 and seem to be suffering from the
issue described in this email

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-tt23364.html

Our workers have 8 slots and some workers are fully loaded and as a
consequence get to cope with heavy load during peak times. while other
workers sit completely idle. and have 0 jobs assigned to its slots

Is there any workaround for this . short of reducing the number of slots on
a worker ?

We need to have double the slots we need available in order to cope with
availability zone maintenance . So if we where to reduce the number of
slots we'd have to add new nodes that would then mostly sit idle


AskTimeoutException when canceling job with savepoint on flink 1.6.0

2018-09-05 Thread jelmer
I am trying to upgrade a job from flink 1.4.2 to 1.6.0

When we do a deploy we cancel the job with a savepoint then deploy the new
version of the job from that savepoint. Because our jobs tend to have a lot
of state it often takes multiple minutes for our savepoints to complete.

On flink 1.4.2 we set *akka.client.timeout* to a high value to make sure
the request did not timeout

However on flink 1.6.0 I get an *AskTimeoutException*  and increasing
*akka.client.timeout* only works if i apply it to the running flink process.
Applying it to just the flink client does nothing.

I am reluctant to configure this on the container itself because afaik it
applies to everything inside of flink's internal actor system not just to
creating savepoints.

What is the correct way to use cancel with savepoint for jobs with lots of
state in flink 1.6.0 ?

I Attached the error.
Cancelling job 9068efa57d6599e7d6a71b7f7eac7d2f with savepoint to default 
savepoint directory.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not cancel job 
9068efa57d6599e7d6a71b7f7eac7d2f.
at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:604)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:596)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka://flink/user/jobmanager_1#172582095]] after [1 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:407)
at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:602)
... 9 more
Caused by: java.util.concurrent.CompletionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#172582095]] after [1 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 

Re: TaskIOMetricGroup metrics not unregistered in prometheus on job failure ?

2018-06-27 Thread jelmer
Thanks for taking a look. I took the liberty of creating a pull request for
this.

https://github.com/apache/flink/pull/6211

It would be great if you guys could take a look at it and see if it makes
sense. I tried it out on our servers and it seems to do the job


On Tue, 26 Jun 2018 at 18:47, Chesnay Schepler  wrote:

> Great work on debugging this, you're exactly right.
>
> The children we add to the collector have to be removed individually when
> a metric is unregistered.
>
> If the collector is a io.prometheus.client.Gauge we can use the #remove() 
> method.
> For histograms we will have to modify our HistogramSummaryProxy class to
> allow removing individual histograms.
>
> I've filed FLINK-9665 <https://issues.apache.org/jira/browse/FLINK-9665>.
>
> On 26.06.2018 17:28, jelmer wrote:
>
> Hi Chesnay, sorry for the late reply. I did not have time to look into
> this sooner
>
> I did what you suggested. Added some logging to the PrometheusReporter
> like this :
>
>
> https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a
>
> And deployed a custom build of the reporter to our test environment.
>
> I managed to reproduce the issue like this
>
> 1. Deploy job A : it lands on worker 1
> 2. Deploy job B : it lands on worker 1, take note of the job id
> 3. Redeploy job b by canceling it from a savepoint and deploying it again
> from the savepoint : it lands on worker 3
> 4. Execute curl -s http://localhost:9249/metrics | grep "job id from step
> 2" on worker 1. The metrics are still exposed even though the job is
> canceled
>
> I attached a piece of the log to the email. What I notice is that the two
> jobs register metrics with the same scoped metric name. In this case
> flink_taskmanager_job_task_buffers_inputQueueLength.
>
> The prometheus exporter seems to use reference counting for the metrics
> and the metrics will only be removed when the count is 0, canceling job B
> will lower the counter by 5 but because job A still is deployed the count
> is not 1 so the metric never gets unregistered
>
> Canceling job A will remove the lingering metrics from the old job B
>
> It seems to me that this is a bug and that the childs that are being
> added in notifyOfAddedMetric
> <https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a#diff-36ff6f170e359d30a1265b43659443bfR163>
> should be removed in notifyOfRemovedMetric
>
> Can you confirm this ?
>
>
> --Jelmer
>
>
>
> On Fri, 15 Jun 2018 at 18:01, Chesnay Schepler  wrote:
>
>> I remember that another user reported something similar, but he wasn't
>> using the PrometheusReporter. see
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-metrics-disappearing-after-job-crash-restart-tt20420.html
>>
>> We couldn't find the cause, but my suspicion was FLINK-8946 which will be
>> fixed in 1.4.3 .
>> You could cherry-pick 8b046fafb6ee77a86e360f6b792e7f73399239bd and see
>> whether this actually caused it.
>>
>> Alternatively, if you can reproduce this it would be immensely helpful if
>> you could modify the PrometheusReporter and log all notifications about
>> added or removed metrics.
>>
>> On 15.06.2018 15:42, Till Rohrmann wrote:
>>
>> Hi,
>>
>> this sounds very strange. I just tried it out locally with with a
>> standard metric and the Prometheus metrics seem to be unregistered after
>> the job has reached a terminal state. Thus, it looks as if the standard
>> metrics are properly removed from `CollectorRegistry.defaultRegistry`.
>> Could you check the log files whether they contain anything suspicious
>> about a failed metric deregistration a la `There was a problem
>> unregistering metric`?
>>
>> I've also pulled in Chesnay who knows more about the metric reporters.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 14, 2018 at 11:34 PM jelmer  wrote:
>>
>>> Hi
>>>
>>> We are using flink-metrics-prometheus for reporting on apache flink 1.4.2
>>>
>>> And I am looking into an issue where it seems that somehow in some cases
>>> the metrics registered
>>> by org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup
>>> (flink_taskmanager_job_task_buffers_outPoolUsage etc)  are not being
>>> unregistered in prometheus in case of a job restart
>>>
>>> Eventually this seems to cause a java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/metrics/stats/Rate$1 error when a new version of
>>> the job is deployed  because the jar file
>>> in /tmp/blobStore-foo/job_bar/blob_p-baz-qux has been removed upon
>>> deployment of the new job but the url classloader still points to it and it
>>> cannot find stats/Rate$1 (some synthetically generated code generated
>>> by the java compiler because its a switch on an enum)
>>>
>>> Has anybody come across this issue ? Has it possibly been fixed in 1.5 ?
>>> Can somebody any pointers as to where to look to tackle this ?
>>>
>>> Attached screenshot shows what classloader that cannot be garbage
>>> collected with the gc root
>>>
>>>
>>
>


Re: TaskIOMetricGroup metrics not unregistered in prometheus on job failure ?

2018-06-26 Thread jelmer
Hi Chesnay, sorry for the late reply. I did not have time to look into this
sooner

I did what you suggested. Added some logging to the PrometheusReporter like
this :

https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a

And deployed a custom build of the reporter to our test environment.

I managed to reproduce the issue like this

1. Deploy job A : it lands on worker 1
2. Deploy job B : it lands on worker 1, take note of the job id
3. Redeploy job b by canceling it from a savepoint and deploying it again
from the savepoint : it lands on worker 3
4. Execute curl -s http://localhost:9249/metrics | grep "job id from step
2" on worker 1. The metrics are still exposed even though the job is
canceled

I attached a piece of the log to the email. What I notice is that the two
jobs register metrics with the same scoped metric name. In this case
flink_taskmanager_job_task_buffers_inputQueueLength.

The prometheus exporter seems to use reference counting for the metrics and
the metrics will only be removed when the count is 0, canceling job B will
lower the counter by 5 but because job A still is deployed the count is not
1 so the metric never gets unregistered

Canceling job A will remove the lingering metrics from the old job B

It seems to me that this is a bug and that the childs that are being added
in notifyOfAddedMetric
<https://github.com/jelmerk/flink/commit/58779ee60a8c3961f3eb2c487c603c33822bba8a#diff-36ff6f170e359d30a1265b43659443bfR163>
should be removed in notifyOfRemovedMetric

Can you confirm this ?


--Jelmer



On Fri, 15 Jun 2018 at 18:01, Chesnay Schepler  wrote:

> I remember that another user reported something similar, but he wasn't
> using the PrometheusReporter. see
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-metrics-disappearing-after-job-crash-restart-tt20420.html
>
> We couldn't find the cause, but my suspicion was FLINK-8946 which will be
> fixed in 1.4.3 .
> You could cherry-pick 8b046fafb6ee77a86e360f6b792e7f73399239bd and see
> whether this actually caused it.
>
> Alternatively, if you can reproduce this it would be immensely helpful if
> you could modify the PrometheusReporter and log all notifications about
> added or removed metrics.
>
> On 15.06.2018 15:42, Till Rohrmann wrote:
>
> Hi,
>
> this sounds very strange. I just tried it out locally with with a standard
> metric and the Prometheus metrics seem to be unregistered after the job has
> reached a terminal state. Thus, it looks as if the standard metrics are
> properly removed from `CollectorRegistry.defaultRegistry`. Could you check
> the log files whether they contain anything suspicious about a failed
> metric deregistration a la `There was a problem unregistering metric`?
>
> I've also pulled in Chesnay who knows more about the metric reporters.
>
> Cheers,
> Till
>
> On Thu, Jun 14, 2018 at 11:34 PM jelmer  wrote:
>
>> Hi
>>
>> We are using flink-metrics-prometheus for reporting on apache flink 1.4.2
>>
>> And I am looking into an issue where it seems that somehow in some cases
>> the metrics registered
>> by org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup
>> (flink_taskmanager_job_task_buffers_outPoolUsage etc)  are not being
>> unregistered in prometheus in case of a job restart
>>
>> Eventually this seems to cause a java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/metrics/stats/Rate$1 error when a new version of
>> the job is deployed  because the jar file
>> in /tmp/blobStore-foo/job_bar/blob_p-baz-qux has been removed upon
>> deployment of the new job but the url classloader still points to it and it
>> cannot find stats/Rate$1 (some synthetically generated code generated by
>> the java compiler because its a switch on an enum)
>>
>> Has anybody come across this issue ? Has it possibly been fixed in 1.5 ?
>> Can somebody any pointers as to where to look to tackle this ?
>>
>> Attached screenshot shows what classloader that cannot be garbage
>> collected with the gc root
>>
>>
>
2018-06-26 15:21:41.602 [marshalled-with-bot-header-for-dev -> Sink: 
kafka-sink-for-dev (1/1)] INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter  - Metric added with 
metricName inputQueueLength and scopedMetricName 
flink_taskmanager_job_task_buffers_inputQueueLength and metricGroup scope 
components 
flinkworker001,taskmanager,odin-router-v2,marshalled-with-bot-header-for-dev -> 
Sink: kafka-sink-for-dev,0,buffers and metric group variables 
{=58f009823d425ec30a95fcab9b91929f, 
=cc2765d0460902a818200cb341f809c2, 
=d00c5dc5565d99849c4d996057cdf99f, =flinkworker001, 
=marshalled-with-bot-header-for-dev -> Sink: kafka-sink-for-dev, 
=0, =odin-rou

Re: pre-initializing global window state

2018-05-07 Thread jelmer
Hi Ken

> 1. I would first try using RockDB with incremental checkpointing
<https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>,
before deciding that an alternative approach is required.

That would reduce the size of the checkpoints but as far as I know not the
savepoints. If i understand correctly that is still a copy of the entire
system state. So deploying a new version of the application will be a
daunting proposition involving  saving many gigabytes of data to external
storage. and restarts that will take a very long time. Also our flink
workers are not really scoped to these kind of storage requirements

> 2 Have you considered using queryable state vs. also keeping the list of
events in Cassandra?

We looked at it before and at the time it was still somewhat experimental
and somewhat immature with regards to handling failure scenarios. And it
would require all the state to reside in flink. Which would again lead to
long restarts when creating savepoints

> 3. Depending on what you need the list of events for, often you can apply
a streaming algorithm to get good-enough (approximate) results without
storing complete state.

Hyperloglog etc makes a lot of sense in many scenario's, but unfortunately
this is not one of them. :-(



Another alternative I thought of for this problem is to  sidestep the
window abstraction and fall back to a processing function with timers


On 7 May 2018 at 17:31, Ken Krugler <kkrugler_li...@transpac.com> wrote:

> Hi Jelmer,
>
> Three comments, if I understand your use case correctly…
>
> 1. I would first try using RockDB with incremental checkpointing
> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>,
> before deciding that an alternative approach is required.
>
> 2. Have you considered using queryable state
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html>
>  vs.
> also keeping the list of events in Cassandra?
>
> 3. Depending on what you need the list of events for, often you can apply
> a streaming algorithm to get good-enough (approximate) results without
> storing complete state.
>
> — Ken
>
>
> On May 7, 2018, at 5:29 AM, jelmer <jkupe...@gmail.com> wrote:
>
> Hi I am looking for some advice on how to solve the following problem
>
> I'd like to keep track of the all time last n events received for a user.
> An event on average takes up 500 bytes and here will be ten's of millions
> of users for which we need to keep this information. The list of events
> will be stored in cassandra for serving by an api
>
> One way I can think of to implement this , is to use a global window per
> user with a count evictor.
>
> The problem I see with this is that the state would forever remain on the
> worker nodes, in our case, in rocks db.
>
> This means there would be a *lot* of state to include for savepoints and
> checkpoints. This would make such a job very unwieldy to operate.
>
> Is it possible to evict state from global state after some period of
> inactivity. and then reinitalize the global state with data loaded from
> cassandra when a new event arrives ?
>
> Or is there an obvious better way to tackle this problem that i am missing
>
> Any pointers would be greatly appreciated
>
>
>
> 
> http://about.me/kkrugler
> +1 530-210-6378
>
>


pre-initializing global window state

2018-05-07 Thread jelmer
Hi I am looking for some advice on how to solve the following problem

I'd like to keep track of the all time last n events received for a user.
An event on average takes up 500 bytes and here will be ten's of millions
of users for which we need to keep this information. The list of events
will be stored in cassandra for serving by an api

One way I can think of to implement this , is to use a global window per
user with a count evictor.

The problem I see with this is that the state would forever remain on the
worker nodes, in our case, in rocks db.

This means there would be a *lot* of state to include for savepoints and
checkpoints. This would make such a job very unwieldy to operate.

Is it possible to evict state from global state after some period of
inactivity. and then reinitalize the global state with data loaded from
cassandra when a new event arrives ?

Or is there an obvious better way to tackle this problem that i am missing

Any pointers would be greatly appreciated


Outputting the content of in flight session windows

2018-04-18 Thread jelmer
I defined a session window and I would like to write the contents of the
window to storage before the window closes

Initially I was doing this by setting a CountTrigger.of(1) on the session
window. But this leads to very frequent writes.

To remedy this i switched to
a ContinuousProcessingTimeTrigger.of(Time.minutes(1)) but this seems to not
perform well, and if i understand it correctly it will fire even if no new
elements have been added to the window since the last fire.

I then proceeded to create my own trigger implementation
 that
aims to Fire at most once every minute (processing time)

This works In simple test scenario's on my developer machine but somehow
fails on production.

So I have two questions.

1. What would be the best way to periodically output content of a session
window
2. Can anyone come up with a plausible reason why my custom trigger never
fires ?


Re: Task manager not able to rejoin job manager after network hicup

2018-02-24 Thread jelmer
I don't think its entirely the same thing. It seems to be that by design once
a worker misses a heartbeat for whatever reason , be it a network hicup or
a long stop the world garbage collect etc etc, it gets quarantined and it
will not recover from that until it is restarted.

Which is what the post by till in the thread you linked seems to indicate.

I assumed that a system like flink would be able to recover from this and
that if it does not that its a bug

Your problem seems to be that for some reason flink misses the heartbeats
under heavy load

I just simulated missing a heartbeat by blocking traffic to the job manager




On 24 February 2018 at 15:57, ashish pok <ashish...@yahoo.com> wrote:

> We see the same in 1.4. I dont think we could see this in 1.3. I had
> started a thread a while back on this. Till asked for more details. I
> havent had a chance to get back to him on this. If you can repro this
> easily perhaps you can get to it faster. I will find the thread and resend.
>
> Thanks,
>
> -- Ashish
>
> On Fri, Feb 23, 2018 at 9:56 AM, jelmer
> <jkupe...@gmail.com> wrote:
> We found out there's a taskmanager.exit-on-fatal-akka-error property that
> will restart flink in this situation but it is not enabled by default and
> that feels like a rather blunt tool. I expect systems like this to be more
> resilient to this
>
> On 23 February 2018 at 14:42, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> @Till Is this the expected behaviour or do you suspect something could be
> going wrong?
>
>
> On 23. Feb 2018, at 08:59, jelmer <jkupe...@gmail.com> wrote:
>
> We've observed on our flink 1.4.0 setup that if for some reason the
> networking between the task manager and the job manager gets disrupted then
> the task manager is never able to reconnect.
>
> You'll end up with messages like this getting printed to the log repeatedly
>
> Trying to register at JobManager akka.tcp://flink@jobmanager: 
> 6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
> Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable 
> or has not been restarted. Keeping it quarantined.
>
>
> Or alternatively
>
>
> Tried to associate with unreachable remote address 
> [akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: [The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
>
>
> But it never recovers until you either restart the job manager or the task
> manager
>
> I was able to successfully reproduce this behaviour in two docker
> containers here :
>
> https://github.com/jelmerk/ flink-worker-not-rejoining
> <https://github.com/jelmerk/flink-worker-not-rejoining>
>
> Has anyone else seen this problem ?
>
>
>
>
>
>
>
>
>
>


Re: Task manager not able to rejoin job manager after network hicup

2018-02-23 Thread jelmer
We found out there's a taskmanager.exit-on-fatal-akka-error property that
will restart flink in this situation but it is not enabled by default and
that feels like a rather blunt tool. I expect systems like this to be more
resilient to this

On 23 February 2018 at 14:42, Aljoscha Krettek <aljos...@apache.org> wrote:

> @Till Is this the expected behaviour or do you suspect something could be
> going wrong?
>
>
> On 23. Feb 2018, at 08:59, jelmer <jkupe...@gmail.com> wrote:
>
> We've observed on our flink 1.4.0 setup that if for some reason the
> networking between the task manager and the job manager gets disrupted then
> the task manager is never able to reconnect.
>
> You'll end up with messages like this getting printed to the log repeatedly
>
> Trying to register at JobManager 
> akka.tcp://flink@jobmanager:6123/user/jobmanager (attempt 17, timeout: 3 
> milliseconds)
> Quarantined address [akka.tcp://flink@jobmanager:6123] is still unreachable 
> or has not been restarted. Keeping it quarantined.
>
>
> Or alternatively
>
>
> Tried to associate with unreachable remote address 
> [akka.tcp://flink@jobmanager:6123]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: [The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
>
>
> But it never recovers until you either restart the job manager or the task
> manager
>
> I was able to successfully reproduce this behaviour in two docker
> containers here :
>
> https://github.com/jelmerk/flink-worker-not-rejoining
>
> Has anyone else seen this problem ?
>
>
>
>
>
>
>
>
>


Task manager not able to rejoin job manager after network hicup

2018-02-23 Thread jelmer
We've observed on our flink 1.4.0 setup that if for some reason the
networking between the task manager and the job manager gets disrupted then
the task manager is never able to reconnect.

You'll end up with messages like this getting printed to the log repeatedly

Trying to register at JobManager
akka.tcp://flink@jobmanager:6123/user/jobmanager (attempt 17, timeout:
3 milliseconds)
Quarantined address [akka.tcp://flink@jobmanager:6123] is still
unreachable or has not been restarted. Keeping it quarantined.


Or alternatively


Tried to associate with unreachable remote address
[akka.tcp://flink@jobmanager:6123]. Address is now gated for 5000 ms,
all messages to this address will be delivered to dead letters.
Reason: [The remote system has quarantined this system. No further
associations to the remote system are possible until this system is
restarted.


But it never recovers until you either restart the job manager or the task
manager

I was able to successfully reproduce this behaviour in two docker
containers here :

https://github.com/jelmerk/flink-worker-not-rejoining

Has anyone else seen this problem ?


Re: How to make savepoints more robust in the face of refactorings ?

2018-01-30 Thread jelmer
I looked into it a little more. The anonymous-classed serializer is being
created here

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L1247

So far the only strategy for making it less likely to break is defining the
Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] =
createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like
Tuple2TypeInformation$$anon$2$$annon$1 instead of
com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename
this Tuple2TypeInformation around everything will work.. but it feels very
suboptimal.



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> In the Scala API, type serializers may be anonymous classes generated by
> Scala macros, and would therefore contain a reference to the wrapping class
> (i.e., your `Operators` class).
> Since Flink currently serializes serializers into the savepoint to be used
> for deserialization on restore, and the fact that they must be present at
> restore time, changing the `Operators` classname would result in the
> previous anonymous class serializer to no longer be in the classpath and
> therefore fails the deserialization of the written serializer.
> This is a limitation caused by how registering serializers for written
> state currently works in Flink.
>
> Generally speaking, to overcome this, you would need to have the previous
> serializer class still around in the classpath when restoring, and can only
> be completely removed from user code once the migration is completed.
>
> One thing that I’m not completely certain with yet, is where in your
> demonstrated code a anonymous-classed serializer is generated for some type.
> From what I see, there shouldn’t be any anonymous-class serializers for
> the code. Is the code you provided a “simplified” version of the actual
> code in which you observed the restore error?
>
> Cheers,
> Gordon
>
>
> On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote:
>
> Changing the class operators are nested in can break compatibility with
> existing savepoints. The following piece of code demonstrates this
>
> https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a
>
> If I change Operators in this file to Operators2  i will not be able to
> recover from a savepoint that was made  when this class still had its old
> name.
>
> The error in the flink ui will be
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMet
> aInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1216)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
>
> But the real reason is found in the task manager logs
>
>
> 2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.
> TypeSerializerSerializationUtil  - Deserialization of serializer errored;
> replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at or

Re: Starting a job that does not use checkpointing from a savepoint is broken ?

2018-01-18 Thread jelmer
jelmer <jkupe...@gmail.com>
07:49 (0 minutes ago)
to Eron
Hey Eron,

Thanks, you stated the issue better and more compact than I could

I will not debate the wisdom of not using checkpoints but when migrating
jobs you may not  be aware if a job has checkpointing enabled, if you are
not the author, and if you follow the upgrade guide to the letter you end
up seriously breaking this job.

Somewhere something is wrong, be it in the documentation or implementation



On 19 January 2018 at 02:05, Eron Wright <eronwri...@gmail.com> wrote:

> To restate the issue:
> When checkpointing is disabled, the Flink Kafka Consumer relies on the
> periodic offsets that are committed to the broker by the internal Kafka
> client.  Such a job would, upon restart, continue from the committed
> offsets.   However, in the situation that the job is restored from a
> savepoint, then the offsets within the savepoint supercede the broker-based
> offsets.
>
> It seems a bit unusual to use the savepoint feature on a job that doesn't
> have checkpointing enabled.  Makes me wonder whether
> `StreamExecutionEnvironment::enableCheckpointing`, is best understood as
> enabling +periodic+ checkpointing.
>
> The docs say that the periodic offset commit feature is not intended for
> fault tolerance, implying to me that you should use Flink's checkpointing
> feature.  A great reason to use Flink checkpointing is to capture the
> intermediate state of the job, such as window state, in addition to the
> consumer offsets.
>
> I hope this helps,
> Eron
>
>
>
>
>
> On Thu, Jan 18, 2018 at 3:26 PM, jelmer <jkupe...@gmail.com> wrote:
>
>> I ran into a rather annoying issue today while upgrading a  flink jobs
>> from flink 1.3.2 to 1.4.0
>>
>> This particular job does not use checkpointing not state.
>>
>> I followed the instructions at https://ci.apache.org/projects
>> /flink/flink-docs-release-1.4/ops/upgrading.html
>>
>> First created a savepoint, upgraded the cluster, then restarted the job
>> from the savepoint.
>>
>> This all went well until later a few hours later one of our kafka nodes
>> dies.This triggered an exception in the job which was subsequently
>> restarted.
>>
>> However instead of picking up where it left off based on the offsets
>> comitted to kafka (which is what should happen according to
>> https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html)  the kafka offsets where reset to the
>> point when i made the savepoint 3 hours earlier and so it started
>> reprocessing millions of messages.
>>
>> Needless to say that creating a savepoint for a job without state or
>> checkpoints does not make that much sense. But I would not expect a restart
>> from a savepoint to completely break a job in the case of failure.
>>
>> I created a repository that reproduces the scenario I encountered
>>
>> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>>
>> Am I misunderstanding anything or should i file a bug for this ?
>>
>>
>>
>


Starting a job that does not use checkpointing from a savepoint is broken ?

2018-01-18 Thread jelmer
I ran into a rather annoying issue today while upgrading a  flink jobs from
flink 1.3.2 to 1.4.0

This particular job does not use checkpointing not state.

I followed the instructions at
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html

First created a savepoint, upgraded the cluster, then restarted the job
from the savepoint.

This all went well until later a few hours later one of our kafka nodes
dies.This triggered an exception in the job which was subsequently
restarted.

However instead of picking up where it left off based on the offsets
comitted to kafka (which is what should happen according to
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html)
the kafka offsets where reset to the point when i made the savepoint 3
hours earlier and so it started reprocessing millions of messages.

Needless to say that creating a savepoint for a job without state or
checkpoints does not make that much sense. But I would not expect a restart
from a savepoint to completely break a job in the case of failure.

I created a repository that reproduces the scenario I encountered

https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing

Am I misunderstanding anything or should i file a bug for this ?


Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread jelmer
I think i found the issue. I'd just like to verify that my reasoning is
correct

We had the following keys in our flink-conf.yaml

jobmanager.web.address: localhost
jobmanager.web.port: 8081

This worked on flink 1.3.2

But on flink 1.4.0 this check

https://github.com/apache/flink/blob/32770103253e01cd61c8634378cfa1b26707e19a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java#L62

Will make make it so that both master and standby think that they don't
need to perform a redirect. Which means that the standby node will serve
web traffic.

I am assuming that it is intended that this never happens. (because if will
call remote actor systems) so this class not being serializable is not a bug





On 16 January 2018 at 14:51, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi,
>
> this indeed indicates that a REST handler is requesting the ExecutionGraph
> from a JobManager which does not run in the same ActorSystem. Could you
> please tell us the exact HA setup. Are your running Flink on Yarn with HA
> or do you use standalone HA with standby JobManagers?
>
> It would be really helpful if you could also share the logs with us.
>
> Cheers,
> Till
>
> On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber <n...@data-artisans.com>
> wrote:
>
>> IMHO, this looks like a bug and it makes sense that you only see this
>> with an HA setup:
>>
>> The JobFound message contains the ExecutionGraph which, however, does
>> not implement the Serializable interface. Without HA, when browsing the
>> web interface, this message is (probably) not serialized since it is
>> only served to you via HTML. For HA, this may come from another
>> JobManager than the Web interface you are browsing.
>> I'm including Till (cc'd) as he might know more.
>>
>>
>> Nico
>>
>> On 16/01/18 09:22, jelmer wrote:
>> > HI,
>> >
>> > We recently upgraded our test environment to from flink 1.3.2 to flink
>> > 1.4.0.
>> >
>> > We are using a high availability setup on the job manager. And now often
>> > when I go to the job details in the web ui the call will timeout and the
>> > following error will pop up in the job manager log
>> >
>> >
>> > akka.remote.MessageSerializer$SerializationException: Failed to
>> > serialize remote message [class
>> > org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
>> > serializer [class akka.serialization.JavaSerializer].
>> > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:889)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:889)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.
>> scala:755)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> >

Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread jelmer
HI,

We recently upgraded our test environment to from flink 1.3.2 to flink
1.4.0.

We are using a high availability setup on the job manager. And now often
when I go to the job details in the web ui the call will timeout and the
following error will pop up in the job manager log


akka.remote.MessageSerializer$SerializationException: Failed to serialize
remote message [class
org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
serializer [class akka.serialization.JavaSerializer].
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.4.0.jar:1.4.0]
Caused by: java.io.NotSerializableException:
org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[na:1.8.0_131]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[na:1.8.0_131]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[na:1.8.0_131]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
... 17 common frames omitted



I isolated it further, and it seems to be triggered by this call

https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d

I cannot reproduce it on my local lapop without HA setup.
Before I dig any deeper, has anyone already come across this ?


BucketingSink broken in flink 1.4.0 ?

2018-01-10 Thread jelmer
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and
connect to hdfs://localhost:12345/ instead of the hfds url i have specified
in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs =
org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct
fix ? I guess replacing FileSystem.get with
FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the
context to know if that would be safe


queryable state and maintaining all time counts

2017-12-29 Thread jelmer
Hi,

I've been going through various talks on flink's support for queryable
state. Like this talk by Jamie Grier at 2016's Flink forward :

https://www.youtube.com/watch?v=uuv-lnOrD0o

I see how you can easily use this to produce time series data. Eg calculate
the number of events per hour.

But I am wondering how one would go about using it for also maintaining all
time counts. Eg count the number of events since the beginning of time. Is
anybody doing this ? And if so what is your strategy ?