Hi Miguel,

Actually, a lot has changed since 1.4.
Flink 1.5 will feature a completely (cluster) setup and deployment model.
The dev effort is known as FLIP-6 [1].
So it is not unlikely that you discovered a regression.

Would you mind opening a JIRA ticker for the issue?

Thank you very much,
Fabian

Best,
Fabian

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077


2018-04-21 20:08 GMT+02:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Hello,
>
> Just to provide a brief update: I got this working by moving to the stable
> version 1.4.2.
> I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem
> occurred in both.
>
> ​If I'm not mistaken, LocalEnvironment is primarily ​targeted at
> debugging scenarios.
> In my case, I explicitly want to use it on a complex series of jobs for
> now.
> However, it seems some sort of bug was introduced after 1.4.2?
>
> I ask this because my same code leads to the operators stuck on
> ​
> java.lang.Thread.State: WAITING in the snapshot versions but it works
> fine in 1.4.2.
> Was there any specific design change after 1.4.2 regarding the way the
> Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?)
> when using LocalEnvironment?
>
> I would like to explore this issue and perhaps contribute to fixing it or
> at least understand.
>
> Thank you very much.​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> On 17 April 2018 at 22:52, Miguel Coimbra <miguel.e.coim...@gmail.com>
> wrote:
>
>> Hello James,
>>
>> Thanks for the information.
>> I noticed something suspicious as well: I have chains of operators where
>> the first operator will ingest the expected amount of records but will not
>> emit any, leaving the following operator empty in a "RUNNING" state.
>> For example:
>>
>>
>>
>> I will get back if I find out more.
>>
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>
>> On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote:
>>
>>> Miguel, I and my colleague ran into same problem yesterday.
>>> We were expecting Flink to get 4 inputs from Kafka and write the inputs
>>> to Cassandra, but the operators got stuck after the 1st input is written
>>> into Cassandra.
>>> This is how DAG looks like:
>>> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
>>> After we disable the auto chaining (https://ci.apache.org/project
>>> s/flink/flink-docs-release-1.4/dev/stream/operators/#task-ch
>>> aining-and-resource-groups), all 4 inputs are read from Kafka and
>>> written into Cassandra.
>>> We are still figuring out why the chaining causes the blocking.
>>>
>>>
>>> This is a UTF-8 formatted mail
>>> -----------------------------------------------
>>> James C.-C.Yu
>>> +886988713275
>>>
>>> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>
>>>> Chesnay, following your suggestions I got access to the web interface
>>>> and also took a closer look at the debugging logs.
>>>> I have noticed one problem regarding the web interface port - it keeps
>>>> changing port now and then during my Java program's execution.
>>>>
>>>> Not sure if that is due to my program launching several job executions
>>>> sequentially, but the fact is that it happened.
>>>> Since I am accessing the web interface via tunneling, it becomes rather
>>>> cumbersome to keep adapting it.
>>>>
>>>> Another particular problem I'm noticing is that this exception
>>>> frequently pops up (debugging with log4j):
>>>>
>>>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>>>> ster.slotpool.SlotPool          - Releasing slot with slot request id
>>>> 9055ef473251505dac04c99727106dc9.
>>>> org.apache.flink.util.FlinkException: Slot is being returned to the
>>>> SlotPool.
>>>>         at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>>>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>>>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>>>>         at java.util.concurrent.CompletableFuture.uniHandle(Completable
>>>> Future.java:822)
>>>>         at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>>>> tableFuture.java:834)
>>>>         at java.util.concurrent.CompletableFuture.handle(CompletableFut
>>>> ure.java:2155)
>>>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>>> t.releaseSlot(SingleLogicalSlot.java:130)
>>>>         at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>>>> ignedResource(Execution.java:1239)
>>>>         at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>>> ed(Execution.java:946)
>>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>>> eState(ExecutionGraph.java:1588)
>>>>         at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>>>> tionState(JobMaster.java:593)
>>>>         at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>>>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>>> cation(AkkaRpcActor.java:210)
>>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
>>>> (AkkaRpcActor.java:154)
>>>>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
>>>> essage(FencedAkkaRpcActor.java:66)
>>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
>>>> ive$1(AkkaRpcActor.java:132)
>>>>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
>>>> .scala:544)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>>>> ava:260)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>>> kJoinPool.java:1339)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>> l.java:1979)
>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>> orkerThread.java:107)
>>>>
>>>> Don't know if the internals of Flink are explicitly using an exception
>>>> for control flow, but there are several occurrences of this as time goes 
>>>> by.
>>>>
>>>> Regarding my program itself, I've achieved some progress.
>>>> In my program I need to do a sequence of series of Flink jobs, and need
>>>> extra care to make sure no DataSet instance from job *i* is being used
>>>> in an operator in job *i + 1*.
>>>> I believe this was generating the waiting scenarios I describe in an
>>>> earlier email.
>>>> The bottom line is to be extra careful about when job executions are
>>>> actually triggered and to make sure that a DataSet which will need to
>>>> be used in different Flink jobs is available for example as a file in
>>>> secondary storage (possibly masked as a memory-mapping) and is exclusively
>>>> read from that source.
>>>> This means ensuring the job that originally produces a DataSet (for
>>>> reuse on a later job) assigns to it a DataSink for secondary storage.
>>>>
>>>> I'm going to keep digging taking this in account - if will report back
>>>> if I manage to fix everything or find a new problem.
>>>>
>>>> Thanks again,
>>>>
>>>>
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>
>>>> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote:
>>>>
>>>>> ah yes, currently when you use that method the UI is started on a
>>>>> random port. I'm currently fixing that in this PR
>>>>> <https://github.com/apache/flink/pull/5814> that will be merged
>>>>> today. For now you will enable logging and search for something along the
>>>>> lines of "http://<host>:<port> was granted leadership"
>>>>>
>>>>> Sorry for the inconvenience.
>>>>>
>>>>> On 16.04.2018 15:04, Miguel Coimbra wrote:
>>>>>
>>>>> Thanks for the suggestions Chesnay, I will try them out.
>>>>>
>>>>> However, I have already tried your suggestion with the dependency
>>>>> flink-runtime-web and nothing happened.
>>>>> If I understood you correctly, adding that dependency in the pom.xml
>>>>> would make it so the web front-end is running when I call the following
>>>>> line?
>>>>>
>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>
>>>>> I added flink-runtime-web  in my pom.xml, recompiled and launched the
>>>>> program but I simply got "Unable to connect" in my browser (Firefox) on
>>>>> localhost:8081.
>>>>> Performing wget on localhost:8081 resulted in this:
>>>>>
>>>>> $ wget localhost:8081
>>>>> --2018-04-16 12:47:26--  http://localhost:8081/
>>>>> Resolving localhost (localhost)... ::1, 127.0.0.1
>>>>> Connecting to localhost (localhost)|::1|:8081... failed: Connection
>>>>> refused.
>>>>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>>>> Connection refused.
>>>>>
>>>>> It seems something was bound to localhost:8081 but the connection is
>>>>> not working for some reason.
>>>>> I probably am skipping some important detail.
>>>>> These are some of my dependencies:
>>>>>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-java</artifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-core</artifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-gelly-examples_${scala.binary.version}</ar
>>>>> tifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>     <groupId>org.apache.flink</groupId>
>>>>>     <artifactId>flink-streaming-java_${scala.binary.version}</ar
>>>>> tifactId>
>>>>>     <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>      <groupId>org.apache.flink</groupId>
>>>>>      <artifactId>flink-streaming-scala_${scala.binary.version}</
>>>>> artifactId>
>>>>>      <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-ru
>>>>> ntime-web -->
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *<dependency>      <groupId>org.apache.flink</groupId>
>>>>>  <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>>>>  <version>${flink.version}</version> </dependency>*
>>>>>
>>>>> Have you managed to get the web front-end in local mode?
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Miguel E. Coimbra
>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>
>>>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>>>>> flink-runtime-web to be on the classpath, which is rarely the class
>>>>>> when running things in the IDE.
>>>>>> It should work fine in the IDE if you add it as a dependency to your
>>>>>> project. This should've been logged as a warning.
>>>>>>
>>>>>> Chaining is unrelated to this issue as join operators are never
>>>>>> chained to one another.
>>>>>> Lambda functions are also not the issue, if they were the job would
>>>>>> fail much earlier.
>>>>>>
>>>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no
>>>>>> input hence produces no output, which now also blocks T3.
>>>>>>
>>>>>> There are multiple possible explanations i can come up with:
>>>>>> * the preceding operators are blocked on something or *really *slow
>>>>>> * the preceding operators are actually finished, but aren't shutting
>>>>>> down due to an implementation error
>>>>>> * a deadlock in Flink's join logic
>>>>>> * a deadlock in Flink's network stack
>>>>>>
>>>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>>>> were dumping the input DataSets into files, but were they actually 
>>>>>> complete?
>>>>>>
>>>>>> A deadlock in the network stack should appear as all existing
>>>>>> operator threads being blocked.
>>>>>> We can probably rule out a problem with the join logic by removing
>>>>>> the second join and trying again.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> It would seem that the function which is supposed to launch local
>>>>>> mode with the web front-end doesn't launch the front-end at all...
>>>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>>>> not mistaken:
>>>>>>
>>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>>
>>>>>> Regarding the preceding operators, the thread dumps I got were
>>>>>> pointing to a specific set of operations over DataSet instances that
>>>>>> were passed into my function.
>>>>>> Below I show the code segment and put the lines where threads are
>>>>>> waiting in *bold*:
>>>>>>
>>>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>>>>     return vertices
>>>>>>             .joinWithHuge(originalGraph.getEdges())
>>>>>>             .where(0).equalTo(0)
>>>>>> *            .with((source, edge) -> edge)* *// Thread 1 is blocked
>>>>>> here*
>>>>>>             .returns(originalGraph.getEdges().getType())
>>>>>>             .join(vertices)
>>>>>>             .where(1).equalTo(0)
>>>>>> *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>>>>             .returns(originalGraph.getEdges().getType())
>>>>>>             .distinct(0, 1);
>>>>>> }
>>>>>>
>>>>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>>>>> greater in number than the elements of the vertices DataSet, so I
>>>>>> believe that function is being used correctly.
>>>>>>
>>>>>> I will try testing with remote (cluster) mode to have access to the
>>>>>> web front-end, but I have some questions for now:
>>>>>>
>>>>>> - The fact that they are blocked in different ​JoinOperator instances
>>>>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>>>>> - Could there be a problem stemming from the fact they are both
>>>>>> waiting on lambdas?
>>>>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>>>>> into files (the ones being used in this code), and they produced
>>>>>> correct values (non-empty files), so I don't have a clue what the threads
>>>>>> inside Flink's runtime are waiting on.
>>>>>>
>>>>>> ​Thanks for the help so far Chesnay.​
>>>>>>
>>>>>>
>>>>>> Miguel E. Coimbra
>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>>
>>>>>> ---------- Forwarded message ----------
>>>>>>
>>>>>>> From: Chesnay Schepler <ches...@apache.org>
>>>>>>> To: user@flink.apache.org
>>>>>>> Cc:
>>>>>>> Bcc:
>>>>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>>>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>>>>>> java.lang.Thread.State: WAITING
>>>>>>> Hello,
>>>>>>>
>>>>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job
>>>>>>> to finish.
>>>>>>>
>>>>>>> To further debug this I would look into what the preceding operators
>>>>>>> are doing, whether they are blocked on something or are emitting records
>>>>>>> (which you can check in the UI/metrics).
>>>>>>>
>>>>>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>>>>
>>>>>>> ​Hello,
>>>>>>>
>>>>>>> I am running into a situation where the Flink threads responsible
>>>>>>> for my operator execution are all stuck on WAITING mode.
>>>>>>> Before anything else, this is my machine's spec:
>>>>>>>
>>>>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>>>>>> GenuineIntel GNU/Linux
>>>>>>> 256 GB RAM
>>>>>>>
>>>>>>> I am running in local mode on a machine with a considerable amount
>>>>>>> of memory, so perhaps that may be triggering some execution edge-case?
>>>>>>>
>>>>>>> Moving on, this is my Java:
>>>>>>>
>>>>>>> openjdk version "1.8.0_151"
>>>>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>>>>
>>>>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
>>>>>>> with LocalEnvironment on this large-memory machine, with
>>>>>>> parallelism set to one:
>>>>>>>
>>>>>>> Configuration conf = new Configuration();
>>>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>>> ExecutionEnvironment env = lenv;
>>>>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>>>>>>> .enableObjectReuse();
>>>>>>> env.setParallelism(1);
>>>>>>>
>>>>>>> This initializes the execution environment for a series of
>>>>>>> sequential jobs (any data dependency between jobs is flushed to disk on 
>>>>>>> job *i
>>>>>>> *and read back from disk into a DataSet in job *i + 1*).
>>>>>>> To reiterate, I am not launching a Flink cluster, I am just
>>>>>>> executing in local mode from a code base compiled with Maven.
>>>>>>>
>>>>>>> I have tested this program via mvn exec:exec with different values
>>>>>>> of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and
>>>>>>> the result is always the same: the process' memory fills up completely 
>>>>>>> and
>>>>>>> then the process' CPU usage drops to 0%.
>>>>>>> This is strange because if it was lack of memory, I would expect an
>>>>>>> OutOfMemoryError.
>>>>>>>
>>>>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>>>>>>> different executions, and realized quite a few operator threads are 
>>>>>>> stuck
>>>>>>> on
>>>>>>> ​​
>>>>>>> java.lang.Thread.State: WAITING.
>>>>>>>
>>>>>>> There are four major threads that I find to be in this waiting state.
>>>>>>> The thread dumps I obtained show me where the wait calls originated:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328))
>>>>>>> -> Combine (Distinct at selectEdges(GraphUtils.java:330))
>>>>>>> (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
>>>>>>>   java.lang.Thread.State: WAITING
>>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:522)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:491)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract
>>>>>>> RecordReader.java:86)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade
>>>>>>> r.java:47)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin
>>>>>>> gMutableObjectIterator.java:36)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab
>>>>>>> le.java:1929)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable
>>>>>>> .java:505)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey
>>>>>>> (ReusingBuildSecondHashJoinIterator.java:122)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.JoinDriver.run(JoinDriver.java:221)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.run(BatchTask.java:503)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.invoke(BatchTask.java:368)
>>>>>>>       at org.apache.flink.runtime.taskm
>>>>>>> anager.Task.run(Task.java:703)
>>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>> *Number 2:*
>>>>>>>
>>>>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92))
>>>>>>> (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
>>>>>>>   java.lang.Thread.State: WAITING
>>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:522)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:491)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract
>>>>>>> RecordReader.java:86)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade
>>>>>>> r.java:47)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin
>>>>>>> gMutableObjectIterator.java:36)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab
>>>>>>> le.java:1929)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable
>>>>>>> .java:505)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey
>>>>>>> (ReusingBuildSecondHashJoinIterator.java:122)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.JoinDriver.run(JoinDriver.java:221)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.run(BatchTask.java:503)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.invoke(BatchTask.java:368)
>>>>>>>       at org.apache.flink.runtime.taskm
>>>>>>> anager.Task.run(Task.java:703)
>>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> *Number 3:*
>>>>>>>
>>>>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
>>>>>>> tid=0xd75 nid=NA waiting
>>>>>>>   java.lang.Thread.State: WAITING
>>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:522)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven
>>>>>>> t(SingleInputGate.java:491)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract
>>>>>>> RecordReader.java:86)
>>>>>>>       at org.apache.flink.runtime.io.ne
>>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade
>>>>>>> r.java:47)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin
>>>>>>> gMutableObjectIterator.java:36)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab
>>>>>>> le.java:1929)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable
>>>>>>> .java:505)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(
>>>>>>> ReusingBuildFirstHashJoinIterator.java:123)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.JoinDriver.run(JoinDriver.java:221)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.run(BatchTask.java:503)
>>>>>>>       at org.apache.flink.runtime.opera
>>>>>>> tors.BatchTask.invoke(BatchTask.java:368)
>>>>>>>       at org.apache.flink.runtime.taskm
>>>>>>> anager.Task.run(Task.java:703)
>>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> *Number 4:*
>>>>>>>
>>>>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
>>>>>>>   java.lang.Thread.State: WAITING
>>>>>>>       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>>>>       at java.util.concurrent.locks.Loc
>>>>>>> kSupport.park(LockSupport.java:175)
>>>>>>>       at java.util.concurrent.Completab
>>>>>>> leFuture$Signaller.block(CompletableFuture.java:1693)
>>>>>>>       at java.util.concurrent.ForkJoinP
>>>>>>> ool.managedBlock(ForkJoinPool.java:3323)
>>>>>>>       at java.util.concurrent.Completab
>>>>>>> leFuture.waitingGet(CompletableFuture.java:1729)
>>>>>>>       at java.util.concurrent.Completab
>>>>>>> leFuture.get(CompletableFuture.java:1895)
>>>>>>>       at org.apache.flink.runtime.minic
>>>>>>> luster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
>>>>>>>       at org.apache.flink.client.LocalE
>>>>>>> xecutor.executePlan(LocalExecutor.java:231)
>>>>>>>       - locked <0x23eb> (a java.lang.Object)
>>>>>>>       at org.apache.flink.api.java.Loca
>>>>>>> lEnvironment.execute(LocalEnvironment.java:91)
>>>>>>>       at org.apache.flink.api.java.Exec
>>>>>>> utionEnvironment.execute(ExecutionEnvironment.java:815)
>>>>>>>       at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>>>>       at my.package.algorithm.Misc.Summ
>>>>>>> aryGraphBuilder.summaryGraph(Misc.java:103)
>>>>>>>       at my.package.algorithm.Sample.co
>>>>>>> mputeApproximateDeltaFast(Sample.java:492)
>>>>>>>       at my.package.algorithm.Sample.run(Sample.java:291).
>>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> While I realize these dumps on their own may not be helpful, they at
>>>>>>> least (as far as I know) indicate that the threads are all waiting on
>>>>>>> something.
>>>>>>> But if it was resource scarcity I believe the program would
>>>>>>> terminate with an exception.
>>>>>>> And if it was garbage collection activity, I believe the JVM process
>>>>>>> would not be at 0% CPU usage.
>>>>>>>
>>>>>>> *Note: *I realize I didn't provide the user-code code that
>>>>>>> generates the execution plan for Flink which led to the contexts in 
>>>>>>> which
>>>>>>> the threads are waiting, but I hope it may not be necessary.
>>>>>>> My problem now is that I am unsure on how to proceed to further
>>>>>>> debug this issue:
>>>>>>> - The assigned memory is fully used, but there are no exceptions
>>>>>>> about lack of memory.
>>>>>>> - The CPU usage is at 0% and all threads are all in a waiting state,
>>>>>>> but I don't understand what signal they're waiting for exactly.
>>>>>>>
>>>>>>> Hoping anyone might be able to give me a hint.
>>>>>>>
>>>>>>> Thank you very much for your time.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Miguel E. Coimbra
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to