Hello,

Just to provide a brief update: I got this working by moving to the stable
version 1.4.2.
I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem
occurred in both.

​If I'm not mistaken, LocalEnvironment is primarily ​targeted at debugging
scenarios.
In my case, I explicitly want to use it on a complex series of jobs for now.
However, it seems some sort of bug was introduced after 1.4.2?

I ask this because my same code leads to the operators stuck on
​
java.lang.Thread.State: WAITING in the snapshot versions but it works fine
in 1.4.2.
Was there any specific design change after 1.4.2 regarding the way the
Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?)
when using LocalEnvironment?

I would like to explore this issue and perhaps contribute to fixing it or
at least understand.

Thank you very much.​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 17 April 2018 at 22:52, Miguel Coimbra <miguel.e.coim...@gmail.com>
wrote:

> Hello James,
>
> Thanks for the information.
> I noticed something suspicious as well: I have chains of operators where
> the first operator will ingest the expected amount of records but will not
> emit any, leaving the following operator empty in a "RUNNING" state.
> For example:
>
>
>
> I will get back if I find out more.
>
>
> Best regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote:
>
>> Miguel, I and my colleague ran into same problem yesterday.
>> We were expecting Flink to get 4 inputs from Kafka and write the inputs
>> to Cassandra, but the operators got stuck after the 1st input is written
>> into Cassandra.
>> This is how DAG looks like:
>> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
>> After we disable the auto chaining (https://ci.apache.org/project
>> s/flink/flink-docs-release-1.4/dev/stream/operators/#task-
>> chaining-and-resource-groups), all 4 inputs are read from Kafka and
>> written into Cassandra.
>> We are still figuring out why the chaining causes the blocking.
>>
>>
>> This is a UTF-8 formatted mail
>> -----------------------------------------------
>> James C.-C.Yu
>> +886988713275
>>
>> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>
>>> Chesnay, following your suggestions I got access to the web interface
>>> and also took a closer look at the debugging logs.
>>> I have noticed one problem regarding the web interface port - it keeps
>>> changing port now and then during my Java program's execution.
>>>
>>> Not sure if that is due to my program launching several job executions
>>> sequentially, but the fact is that it happened.
>>> Since I am accessing the web interface via tunneling, it becomes rather
>>> cumbersome to keep adapting it.
>>>
>>> Another particular problem I'm noticing is that this exception
>>> frequently pops up (debugging with log4j):
>>>
>>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>>> ster.slotpool.SlotPool          - Releasing slot with slot request id
>>> 9055ef473251505dac04c99727106dc9.
>>> org.apache.flink.util.FlinkException: Slot is being returned to the
>>> SlotPool.
>>>         at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>>>         at java.util.concurrent.CompletableFuture.uniHandle(Completable
>>> Future.java:822)
>>>         at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>>> tableFuture.java:834)
>>>         at java.util.concurrent.CompletableFuture.handle(CompletableFut
>>> ure.java:2155)
>>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>> t.releaseSlot(SingleLogicalSlot.java:130)
>>>         at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>>> ignedResource(Execution.java:1239)
>>>         at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:946)
>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1588)
>>>         at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>>> tionState(JobMaster.java:593)
>>>         at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>> cation(AkkaRpcActor.java:210)
>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
>>> (AkkaRpcActor.java:154)
>>>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
>>> essage(FencedAkkaRpcActor.java:66)
>>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
>>> ive$1(AkkaRpcActor.java:132)
>>>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
>>> .scala:544)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>>> ava:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>> kJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>> l.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>> orkerThread.java:107)
>>>
>>> Don't know if the internals of Flink are explicitly using an exception
>>> for control flow, but there are several occurrences of this as time goes by.
>>>
>>> Regarding my program itself, I've achieved some progress.
>>> In my program I need to do a sequence of series of Flink jobs, and need
>>> extra care to make sure no DataSet instance from job *i* is being used
>>> in an operator in job *i + 1*.
>>> I believe this was generating the waiting scenarios I describe in an
>>> earlier email.
>>> The bottom line is to be extra careful about when job executions are
>>> actually triggered and to make sure that a DataSet which will need to
>>> be used in different Flink jobs is available for example as a file in
>>> secondary storage (possibly masked as a memory-mapping) and is exclusively
>>> read from that source.
>>> This means ensuring the job that originally produces a DataSet (for
>>> reuse on a later job) assigns to it a DataSink for secondary storage.
>>>
>>> I'm going to keep digging taking this in account - if will report back
>>> if I manage to fix everything or find a new problem.
>>>
>>> Thanks again,
>>>
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>
>>> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote:
>>>
>>>> ah yes, currently when you use that method the UI is started on a
>>>> random port. I'm currently fixing that in this PR
>>>> <https://github.com/apache/flink/pull/5814> that will be merged today.
>>>> For now you will enable logging and search for something along the lines of
>>>> "http://<host>:<port> was granted leadership"
>>>>
>>>> Sorry for the inconvenience.
>>>>
>>>> On 16.04.2018 15:04, Miguel Coimbra wrote:
>>>>
>>>> Thanks for the suggestions Chesnay, I will try them out.
>>>>
>>>> However, I have already tried your suggestion with the dependency
>>>> flink-runtime-web and nothing happened.
>>>> If I understood you correctly, adding that dependency in the pom.xml
>>>> would make it so the web front-end is running when I call the following
>>>> line?
>>>>
>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>
>>>> I added flink-runtime-web  in my pom.xml, recompiled and launched the
>>>> program but I simply got "Unable to connect" in my browser (Firefox) on
>>>> localhost:8081.
>>>> Performing wget on localhost:8081 resulted in this:
>>>>
>>>> $ wget localhost:8081
>>>> --2018-04-16 12:47:26--  http://localhost:8081/
>>>> Resolving localhost (localhost)... ::1, 127.0.0.1
>>>> Connecting to localhost (localhost)|::1|:8081... failed: Connection
>>>> refused.
>>>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>>> Connection refused.
>>>>
>>>> It seems something was bound to localhost:8081 but the connection is
>>>> not working for some reason.
>>>> I probably am skipping some important detail.
>>>> These are some of my dependencies:
>>>>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-java</artifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-core</artifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-gelly-examples_${scala.binary.version}</ar
>>>> tifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>     <groupId>org.apache.flink</groupId>
>>>>     <artifactId>flink-streaming-java_${scala.binary.version}</ar
>>>> tifactId>
>>>>     <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>>      <groupId>org.apache.flink</groupId>
>>>>      <artifactId>flink-streaming-scala_${scala.binary.version}</
>>>> artifactId>
>>>>      <version>${flink.version}</version>
>>>> </dependency>
>>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-ru
>>>> ntime-web -->
>>>>
>>>>
>>>>
>>>>
>>>> *<dependency>      <groupId>org.apache.flink</groupId>
>>>>  <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>>>  <version>${flink.version}</version> </dependency>*
>>>>
>>>> Have you managed to get the web front-end in local mode?
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>
>>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:
>>>>
>>>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>>>> flink-runtime-web to be on the classpath, which is rarely the class
>>>>> when running things in the IDE.
>>>>> It should work fine in the IDE if you add it as a dependency to your
>>>>> project. This should've been logged as a warning.
>>>>>
>>>>> Chaining is unrelated to this issue as join operators are never
>>>>> chained to one another.
>>>>> Lambda functions are also not the issue, if they were the job would
>>>>> fail much earlier.
>>>>>
>>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
>>>>> hence produces no output, which now also blocks T3.
>>>>>
>>>>> There are multiple possible explanations i can come up with:
>>>>> * the preceding operators are blocked on something or *really *slow
>>>>> * the preceding operators are actually finished, but aren't shutting
>>>>> down due to an implementation error
>>>>> * a deadlock in Flink's join logic
>>>>> * a deadlock in Flink's network stack
>>>>>
>>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>>> were dumping the input DataSets into files, but were they actually 
>>>>> complete?
>>>>>
>>>>> A deadlock in the network stack should appear as all existing operator
>>>>> threads being blocked.
>>>>> We can probably rule out a problem with the join logic by removing the
>>>>> second join and trying again.
>>>>>
>>>>>
>>>>>
>>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> It would seem that the function which is supposed to launch local mode
>>>>> with the web front-end doesn't launch the front-end at all...
>>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>>> not mistaken:
>>>>>
>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>
>>>>> Regarding the preceding operators, the thread dumps I got were
>>>>> pointing to a specific set of operations over DataSet instances that
>>>>> were passed into my function.
>>>>> Below I show the code segment and put the lines where threads are
>>>>> waiting in *bold*:
>>>>>
>>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>>>     return vertices
>>>>>             .joinWithHuge(originalGraph.getEdges())
>>>>>             .where(0).equalTo(0)
>>>>> *            .with((source, edge) -> edge)* *// Thread 1 is blocked
>>>>> here*
>>>>>             .returns(originalGraph.getEdges().getType())
>>>>>             .join(vertices)
>>>>>             .where(1).equalTo(0)
>>>>> *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>>>             .returns(originalGraph.getEdges().getType())
>>>>>             .distinct(0, 1);
>>>>> }
>>>>>
>>>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>>>> greater in number than the elements of the vertices DataSet, so I
>>>>> believe that function is being used correctly.
>>>>>
>>>>> I will try testing with remote (cluster) mode to have access to the
>>>>> web front-end, but I have some questions for now:
>>>>>
>>>>> - The fact that they are blocked in different ​JoinOperator instances
>>>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>>>> - Could there be a problem stemming from the fact they are both
>>>>> waiting on lambdas?
>>>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>>>> into files (the ones being used in this code), and they produced
>>>>> correct values (non-empty files), so I don't have a clue what the threads
>>>>> inside Flink's runtime are waiting on.
>>>>>
>>>>> ​Thanks for the help so far Chesnay.​
>>>>>
>>>>>
>>>>> Miguel E. Coimbra
>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>
>>>>> ---------- Forwarded message ----------
>>>>>
>>>>>> From: Chesnay Schepler <ches...@apache.org>
>>>>>> To: user@flink.apache.org
>>>>>> Cc:
>>>>>> Bcc:
>>>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>>>>> java.lang.Thread.State: WAITING
>>>>>> Hello,
>>>>>>
>>>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job
>>>>>> to finish.
>>>>>>
>>>>>> To further debug this I would look into what the preceding operators
>>>>>> are doing, whether they are blocked on something or are emitting records
>>>>>> (which you can check in the UI/metrics).
>>>>>>
>>>>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>>>
>>>>>> ​Hello,
>>>>>>
>>>>>> I am running into a situation where the Flink threads responsible for
>>>>>> my operator execution are all stuck on WAITING mode.
>>>>>> Before anything else, this is my machine's spec:
>>>>>>
>>>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>>>>> GenuineIntel GNU/Linux
>>>>>> 256 GB RAM
>>>>>>
>>>>>> I am running in local mode on a machine with a considerable amount of
>>>>>> memory, so perhaps that may be triggering some execution edge-case?
>>>>>>
>>>>>> Moving on, this is my Java:
>>>>>>
>>>>>> openjdk version "1.8.0_151"
>>>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>>>
>>>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
>>>>>> with LocalEnvironment on this large-memory machine, with parallelism
>>>>>> set to one:
>>>>>>
>>>>>> Configuration conf = new Configuration();
>>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>> ExecutionEnvironment env = lenv;
>>>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>>>>>> .enableObjectReuse();
>>>>>> env.setParallelism(1);
>>>>>>
>>>>>> This initializes the execution environment for a series of sequential
>>>>>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>>>>>> read back from disk into a DataSet in job *i + 1*).
>>>>>> To reiterate, I am not launching a Flink cluster, I am just executing
>>>>>> in local mode from a code base compiled with Maven.
>>>>>>
>>>>>> I have tested this program via mvn exec:exec with different values
>>>>>> of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and
>>>>>> the result is always the same: the process' memory fills up completely 
>>>>>> and
>>>>>> then the process' CPU usage drops to 0%.
>>>>>> This is strange because if it was lack of memory, I would expect an
>>>>>> OutOfMemoryError.
>>>>>>
>>>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>>>>>> different executions, and realized quite a few operator threads are stuck
>>>>>> on
>>>>>> ​​
>>>>>> java.lang.Thread.State: WAITING.
>>>>>>
>>>>>> There are four major threads that I find to be in this waiting state.
>>>>>> The thread dumps I obtained show me where the wait calls originated:
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328))
>>>>>> -> Combine (Distinct at selectEdges(GraphUtils.java:330))
>>>>>> (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
>>>>>>   java.lang.Thread.State: WAITING
>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord(MutableHashTable.java:666)
>>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>>>> rator.java:122)
>>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>>> .java:221)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:503)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>> k.java:368)
>>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>> *Number 2:*
>>>>>>
>>>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92))
>>>>>> (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
>>>>>>   java.lang.Thread.State: WAITING
>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord(MutableHashTable.java:666)
>>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>>>> rator.java:122)
>>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>>> .java:221)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:503)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>> k.java:368)
>>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> *Number 3:*
>>>>>>
>>>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
>>>>>> tid=0xd75 nid=NA waiting
>>>>>>   java.lang.Thread.State: WAITING
>>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord(MutableHashTable.java:666)
>>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas
>>>>>> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinItera
>>>>>> tor.java:123)
>>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>>> .java:221)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:503)
>>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>> k.java:368)
>>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> *Number 4:*
>>>>>>
>>>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
>>>>>>   java.lang.Thread.State: WAITING
>>>>>>       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>>>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>>>>> :175)
>>>>>>       at java.util.concurrent.CompletableFuture$Signaller.block(Compl
>>>>>> etableFuture.java:1693)
>>>>>>       at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.
>>>>>> java:3323)
>>>>>>       at java.util.concurrent.CompletableFuture.waitingGet(Completabl
>>>>>> eFuture.java:1729)
>>>>>>       at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>>>> .java:1895)
>>>>>>       at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB
>>>>>> locking(MiniCluster.java:519)
>>>>>>       at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>>>>>> tor.java:231)
>>>>>>       - locked <0x23eb> (a java.lang.Object)
>>>>>>       at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>>>>>> ronment.java:91)
>>>>>>       at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>>>>>> tionEnvironment.java:815)
>>>>>>       at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>>>       at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M
>>>>>> isc.java:103)
>>>>>>       at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp
>>>>>> le.java:492)
>>>>>>       at my.package.algorithm.Sample.run(Sample.java:291).
>>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> While I realize these dumps on their own may not be helpful, they at
>>>>>> least (as far as I know) indicate that the threads are all waiting on
>>>>>> something.
>>>>>> But if it was resource scarcity I believe the program would terminate
>>>>>> with an exception.
>>>>>> And if it was garbage collection activity, I believe the JVM process
>>>>>> would not be at 0% CPU usage.
>>>>>>
>>>>>> *Note: *I realize I didn't provide the user-code code that generates
>>>>>> the execution plan for Flink which led to the contexts in which the 
>>>>>> threads
>>>>>> are waiting, but I hope it may not be necessary.
>>>>>> My problem now is that I am unsure on how to proceed to further debug
>>>>>> this issue:
>>>>>> - The assigned memory is fully used, but there are no exceptions
>>>>>> about lack of memory.
>>>>>> - The CPU usage is at 0% and all threads are all in a waiting state,
>>>>>> but I don't understand what signal they're waiting for exactly.
>>>>>>
>>>>>> Hoping anyone might be able to give me a hint.
>>>>>>
>>>>>> Thank you very much for your time.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Miguel E. Coimbra
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to