Hey Niels,

Flink currently restarts the complete job if you have a restart
strategy configured:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html.

I agree that only restarting the required parts of the pipeline is an
important optimization. Flink has not implemented this (fully) yet but
it's on the agenda [1] and work has already started [2].

In this particular case, everything is just slow and we don't need the
restart at all if you give the consumer a higher max timeout.

Please report back when you have more info :-)

– Ufuk

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

[2] https://issues.apache.org/jira/browse/FLINK-4256

On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
> Hi,
>
> I'm currently doing some tests to see it this info helps.
> I was running a different high CPU task on one of the nodes outside Yarn, so
> I took that one out of the cluster to see if that helps.
>
> What I do find strange that in this kind of error scenario the entire job
> fails.
> I would have expected something similar as with 'good old' MapReduce: The
> missing task is simply resubmitted and ran again.
> Why doesn't that happen?
>
>
> Niels
>
> On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Niels,
>>
>> any update on this?
>>
>> – Ufuk
>>
>>
>> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote:
>> > Hey Niels,
>> >
>> > thanks for the detailed report. I don't think that it is related to
>> > the Hadoop or Scala version. I think the following happens:
>> >
>> > - Occasionally, one of your tasks seems to be extremely slow in
>> > registering its produced intermediate result (the data shuffled
>> > between TaskManagers)
>> > - Another task is already requesting to consume data from this task
>> > but cannot find it (after multiple retries) and it fails the complete
>> > job (your stack trace)
>> >
>> > That happens only occasionally probably due to load in your cluster.
>> > The slow down could have multiple reasons...
>> > - Is your Hadoop cluster resource constrained and the tasks are slow to
>> > deploy?
>> > - Is your application JAR very large and needs a lot of time
>> > downloading?
>> >
>> > We have two options at this point:
>> > 1) You can increase the maximum retries via the config option:
>> > "taskmanager.network.request-backoff.max" The default is 10000
>> > (milliseconds) and specifies what the maximum request back off is [1].
>> > Increasing this to 30000 would give you two extra retries with pretty
>> > long delays (see [1]).
>> >
>> > 2) To be sure that this is really what is happening we could increase
>> > the log level of certain classes and check whether they have
>> > registered their results or not. If you want to do this, I'm more than
>> > happy to provide you with some classes to enable DEBUG logging for.
>> >
>> > What do you think?
>> >
>> > – Ufuk
>> >
>> > DETAILS
>> > =======
>> >
>> > - The TaskManagers produce and consume intermediate results
>> > - When a TaskManager wants to consume a result, it directly queries
>> > the producing TaskManager for it
>> > - An intermediate result becomes ready for consumption during initial
>> > task setup (state DEPLOYING)
>> > - When a TaskManager is slow to register its intermediate result and
>> > the consumer requests the result before it is ready, it can happen
>> > that a requested partition is "not found"
>> >
>> > This is what is also happening here. We retry to request the
>> > intermediate result multiple times with timed backoff [1] and only
>> > fail the request (your stack trace) if the partition is still not
>> > ready although we expect it to be ready (that is there was no failure
>> > at the producing task).
>> >
>> > [1] Starting by default at 100 millis and going up to 10_000 millis by
>> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
>> >
>> >
>> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> >> Hi,
>> >>
>> >> I'm having some trouble running a java based Flink job in a
>> >> yarn-session.
>> >>
>> >> The job itself consists of reading a set of files resulting in a
>> >> DataStream
>> >> (I use DataStream because in the future I intend to change the file
>> >> with a
>> >> Kafka feed), then does some parsing and eventually writes the data into
>> >> HBase.
>> >>
>> >> Most of the time running this works fine yet sometimes it fails with
>> >> this
>> >> exception:
>> >>
>> >>
>> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> >> Partition
>> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> >> not found.
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>> >>       at
>> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>> >>       at
>> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>> >>       at akka.dispatch.OnComplete.internal(Future.scala:248)
>> >>       at akka.dispatch.OnComplete.internal(Future.scala:245)
>> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>> >>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>       at
>> >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> >>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >>       at
>> >>
>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> >>       at
>> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>       at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>       at
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>       at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> I went through all logs at the Hadoop side of all the related
>> >> containers and
>> >> other than this exception I did not see any warning/error that might
>> >> explain
>> >> what is going on here.
>> >>
>> >> Now the "Most of the time running this works fine" makes this hard to
>> >> troubleshoot. When I run the same job again it may run perfectly that
>> >> time.
>> >>
>> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked
>> >> my
>> >> pom.xml and I use the same version for Flink / Scala in there.
>> >>
>> >> The command used to start the yarn-session on my experimental cluster
>> >> (no
>> >> security, no other users):
>> >>
>> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>> >>     --container 180 \
>> >>     --name "Flink on Yarn Experiments" \
>> >>     --slots                     1     \
>> >>     --jobManagerMemory          4000  \
>> >>     --taskManagerMemory         4000  \
>> >>     --streaming                       \
>> >>     --detached
>> >>
>> >> Two relevant fragments from my application pom.xml:
>> >>
>> >> <flink.version>1.3.2</flink.version>
>> >> <flink.scala.version>2.11</flink.scala.version>
>> >>
>> >>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-java</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >>
>> >> I could really use some suggestions where to look for the root cause of
>> >> this.
>> >> Is this something in my application? My Hadoop cluster? Or is this a
>> >> problem
>> >> in Flink 1.3.2?
>> >>
>> >> Thanks.
>> >>
>> >> --
>> >> Best regards / Met vriendelijke groeten,
>> >>
>> >> Niels Basjes
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Reply via email to