Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency
flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would
make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
alEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the
program but I simply got "Unable to connect" in my browser (Firefox) on
localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection
refused.

It seems something was bound to localhost:8081 but the connection is not
working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web
-->




*<dependency>     <groupId>org.apache.flink</groupId>
 <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
 <version>${flink.version}</version></dependency>*

Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:

> The thing with createLocalEnvironmentWithWebUI is that it requires
> flink-runtime-web to be on the classpath, which is rarely the class when
> running things in the IDE.
> It should work fine in the IDE if you add it as a dependency to your
> project. This should've been logged as a warning.
>
> Chaining is unrelated to this issue as join operators are never chained to
> one another.
> Lambda functions are also not the issue, if they were the job would fail
> much earlier.
>
> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
> hence produces no output, which now also blocks T3.
>
> There are multiple possible explanations i can come up with:
> * the preceding operators are blocked on something or *really *slow
> * the preceding operators are actually finished, but aren't shutting down
> due to an implementation error
> * a deadlock in Flink's join logic
> * a deadlock in Flink's network stack
>
> For the first 2 we will have to consult the UI or logs. You said you were
> dumping the input DataSets into files, but were they actually complete?
>
> A deadlock in the network stack should appear as all existing operator
> threads being blocked.
> We can probably rule out a problem with the join logic by removing the
> second join and trying again.
>
>
>
> On 16.04.2018 03:10, Miguel Coimbra wrote:
>
> Hello,
>
> It would seem that the function which is supposed to launch local mode
> with the web front-end doesn't launch the front-end at all...
> This function seems not to be doing what it is supposed to do, if I'm not
> mistaken:
>
> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
> alEnvironmentWithWebUI(conf);
>
> Regarding the preceding operators, the thread dumps I got were pointing to
> a specific set of operations over DataSet instances that were passed into
> my function.
> Below I show the code segment and put the lines where threads are waiting
> in *bold*:
>
> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K,
> VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>     return vertices
>             .joinWithHuge(originalGraph.getEdges())
>             .where(0).equalTo(0)
> *            .with((source, edge) -> edge)* *// Thread 1 is blocked here*
>             .returns(originalGraph.getEdges().getType())
>             .join(vertices)
>             .where(1).equalTo(0)
> *            .with((e, v) -> e) // Thread 3 is blocked here*
>             .returns(originalGraph.getEdges().getType())
>             .distinct(0, 1);
> }
>
> Note: the edges inside the graph originalGraph edge DataSet are much
> greater in number than the elements of the vertices DataSet, so I believe
> that function is being used correctly.
>
> I will try testing with remote (cluster) mode to have access to the web
> front-end, but I have some questions for now:
>
> - The fact that they are blocked in different ​JoinOperator instances
> that are chained, is this a result of Flink's default pipeline mechanism?
> - Could there be a problem stemming from the fact they are both waiting on
> lambdas?
> - I have tried dumping both DataSet variables originalGraph and vertices into
> files (the ones being used in this code), and they produced correct values
> (non-empty files), so I don't have a clue what the threads inside Flink's
> runtime are waiting on.
>
> ​Thanks for the help so far Chesnay.​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> ---------- Forwarded message ----------
>
>> From: Chesnay Schepler <ches...@apache.org>
>> To: user@flink.apache.org
>> Cc:
>> Bcc:
>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>> Subject: Re: Unsure how to further debug - operator threads stuck on
>> java.lang.Thread.State: WAITING
>> Hello,
>>
>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>> finish.
>>
>> To further debug this I would look into what the preceding operators are
>> doing, whether they are blocked on something or are emitting records (which
>> you can check in the UI/metrics).
>>
>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>
>> ​Hello,
>>
>> I am running into a situation where the Flink threads responsible for my
>> operator execution are all stuck on WAITING mode.
>> Before anything else, this is my machine's spec:
>>
>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>> GenuineIntel GNU/Linux
>> 256 GB RAM
>>
>> I am running in local mode on a machine with a considerable amount of
>> memory, so perhaps that may be triggering some execution edge-case?
>>
>> Moving on, this is my Java:
>>
>> openjdk version "1.8.0_151"
>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>
>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with 
>> LocalEnvironment
>> on this large-memory machine, with parallelism set to one:
>>
>> Configuration conf = new Configuration();
>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>> alEnvironmentWithWebUI(conf);
>> ExecutionEnvironment env = lenv;
>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>> .enableObjectReuse();
>> env.setParallelism(1);
>>
>> This initializes the execution environment for a series of sequential
>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>> read back from disk into a DataSet in job *i + 1*).
>> To reiterate, I am not launching a Flink cluster, I am just executing in
>> local mode from a code base compiled with Maven.
>>
>> I have tested this program via mvn exec:exec with different values of
>> memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the
>> result is always the same: the process' memory fills up completely and then
>> the process' CPU usage drops to 0%.
>> This is strange because if it was lack of memory, I would expect an
>> OutOfMemoryError.
>>
>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>> different executions, and realized quite a few operator threads are stuck
>> on java.lang.Thread.State: WAITING.
>>
>> There are four major threads that I find to be in this waiting state.
>> The thread dumps I obtained show me where the wait calls originated:
>>
>>
>>
>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
>> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158"
>> prio=5 tid=0xd93 nid=NA waiting
>>   java.lang.Thread.State: WAITING
>>       at java.lang.Object.wait(Object.java:-1)
>>       at java.lang.Object.wait(Object.java:502)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:47)
>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>> beIterator.next(MutableHashTable.java:1929)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>> cessProbeIter(MutableHashTable.java:505)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>> tRecord(MutableHashTable.java:666)
>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>> rator.java:122)
>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>> .java:221)
>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:503)
>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:368)
>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>       at java.lang.Thread.run(Thread.java:748)
>>
>>
>> *Number 2:*
>>
>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
>> prio=5 tid=0xd8e nid=NA waiting
>>   java.lang.Thread.State: WAITING
>>       at java.lang.Object.wait(Object.java:-1)
>>       at java.lang.Object.wait(Object.java:502)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:47)
>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>> beIterator.next(MutableHashTable.java:1929)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>> cessProbeIter(MutableHashTable.java:505)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>> tRecord(MutableHashTable.java:666)
>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>> rator.java:122)
>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>> .java:221)
>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:503)
>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:368)
>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>       at java.lang.Thread.run(Thread.java:748)
>>
>> *Number 3:*
>>
>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
>> tid=0xd75 nid=NA waiting
>>   java.lang.Thread.State: WAITING
>>       at java.lang.Object.wait(Object.java:-1)
>>       at java.lang.Object.wait(Object.java:502)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:47)
>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>> beIterator.next(MutableHashTable.java:1929)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>> cessProbeIter(MutableHashTable.java:505)
>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>> tRecord(MutableHashTable.java:666)
>>       at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas
>> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>> .java:221)
>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:503)
>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:368)
>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>       at java.lang.Thread.run(Thread.java:748)
>>
>> *Number 4:*
>>
>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
>>   java.lang.Thread.State: WAITING
>>       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java
>> :175)
>>       at java.util.concurrent.CompletableFuture$Signaller.block(Compl
>> etableFuture.java:1693)
>>       at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.
>> java:3323)
>>       at java.util.concurrent.CompletableFuture.waitingGet(Completabl
>> eFuture.java:1729)
>>       at java.util.concurrent.CompletableFuture.get(CompletableFuture
>> .java:1895)
>>       at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB
>> locking(MiniCluster.java:519)
>>       at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>> tor.java:231)
>>       - locked <0x23eb> (a java.lang.Object)
>>       at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>> ronment.java:91)
>>       at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>> tionEnvironment.java:815)
>>       at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>       at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M
>> isc.java:103)
>>       at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp
>> le.java:492)
>>       at my.package.algorithm.Sample.run(Sample.java:291).
>>       at java.lang.Thread.run(Thread.java:748)
>>
>> While I realize these dumps on their own may not be helpful, they at
>> least (as far as I know) indicate that the threads are all waiting on
>> something.
>> But if it was resource scarcity I believe the program would terminate
>> with an exception.
>> And if it was garbage collection activity, I believe the JVM process
>> would not be at 0% CPU usage.
>>
>> *Note: *I realize I didn't provide the user-code code that generates the
>> execution plan for Flink which led to the contexts in which the threads are
>> waiting, but I hope it may not be necessary.
>> My problem now is that I am unsure on how to proceed to further debug
>> this issue:
>> - The assigned memory is fully used, but there are no exceptions about
>> lack of memory.
>> - The CPU usage is at 0% and all threads are all in a waiting state, but
>> I don't understand what signal they're waiting for exactly.
>>
>> Hoping anyone might be able to give me a hint.
>>
>> Thank you very much for your time.
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>>
>>
>

Reply via email to