Thanks for the suggestions Chesnay, I will try them out. However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened. If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc alEnvironmentWithWebUI(conf); I added flink-runtime-web in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081. Performing wget on localhost:8081 resulted in this: $ wget localhost:8081 --2018-04-16 12:47:26-- http://localhost:8081/ Resolving localhost (localhost)... ::1, 127.0.0.1 Connecting to localhost (localhost)|::1|:8081... failed: Connection refused. Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused. It seems something was bound to localhost:8081 but the connection is not working for some reason. I probably am skipping some important detail. These are some of my dependencies: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-gelly_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web --> *<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version></dependency>* Have you managed to get the web front-end in local mode? Best regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote: > The thing with createLocalEnvironmentWithWebUI is that it requires > flink-runtime-web to be on the classpath, which is rarely the class when > running things in the IDE. > It should work fine in the IDE if you add it as a dependency to your > project. This should've been logged as a warning. > > Chaining is unrelated to this issue as join operators are never chained to > one another. > Lambda functions are also not the issue, if they were the job would fail > much earlier. > > It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input > hence produces no output, which now also blocks T3. > > There are multiple possible explanations i can come up with: > * the preceding operators are blocked on something or *really *slow > * the preceding operators are actually finished, but aren't shutting down > due to an implementation error > * a deadlock in Flink's join logic > * a deadlock in Flink's network stack > > For the first 2 we will have to consult the UI or logs. You said you were > dumping the input DataSets into files, but were they actually complete? > > A deadlock in the network stack should appear as all existing operator > threads being blocked. > We can probably rule out a problem with the join logic by removing the > second join and trying again. > > > > On 16.04.2018 03:10, Miguel Coimbra wrote: > > Hello, > > It would seem that the function which is supposed to launch local mode > with the web front-end doesn't launch the front-end at all... > This function seems not to be doing what it is supposed to do, if I'm not > mistaken: > > LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc > alEnvironmentWithWebUI(conf); > > Regarding the preceding operators, the thread dumps I got were pointing to > a specific set of operations over DataSet instances that were passed into > my function. > Below I show the code segment and put the lines where threads are waiting > in *bold*: > > public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, > VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) { > return vertices > .joinWithHuge(originalGraph.getEdges()) > .where(0).equalTo(0) > * .with((source, edge) -> edge)* *// Thread 1 is blocked here* > .returns(originalGraph.getEdges().getType()) > .join(vertices) > .where(1).equalTo(0) > * .with((e, v) -> e) // Thread 3 is blocked here* > .returns(originalGraph.getEdges().getType()) > .distinct(0, 1); > } > > Note: the edges inside the graph originalGraph edge DataSet are much > greater in number than the elements of the vertices DataSet, so I believe > that function is being used correctly. > > I will try testing with remote (cluster) mode to have access to the web > front-end, but I have some questions for now: > > - The fact that they are blocked in different JoinOperator instances > that are chained, is this a result of Flink's default pipeline mechanism? > - Could there be a problem stemming from the fact they are both waiting on > lambdas? > - I have tried dumping both DataSet variables originalGraph and vertices into > files (the ones being used in this code), and they produced correct values > (non-empty files), so I don't have a clue what the threads inside Flink's > runtime are waiting on. > > Thanks for the help so far Chesnay. > > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > > ---------- Forwarded message ---------- > >> From: Chesnay Schepler <ches...@apache.org> >> To: user@flink.apache.org >> Cc: >> Bcc: >> Date: Sun, 15 Apr 2018 18:54:33 +0200 >> Subject: Re: Unsure how to further debug - operator threads stuck on >> java.lang.Thread.State: WAITING >> Hello, >> >> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to >> finish. >> >> To further debug this I would look into what the preceding operators are >> doing, whether they are blocked on something or are emitting records (which >> you can check in the UI/metrics). >> >> On 15.04.2018 18:40, Miguel Coimbra wrote: >> >> Hello, >> >> I am running into a situation where the Flink threads responsible for my >> operator execution are all stuck on WAITING mode. >> Before anything else, this is my machine's spec: >> >> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz >> GenuineIntel GNU/Linux >> 256 GB RAM >> >> I am running in local mode on a machine with a considerable amount of >> memory, so perhaps that may be triggering some execution edge-case? >> >> Moving on, this is my Java: >> >> openjdk version "1.8.0_151" >> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) >> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) >> >> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with >> LocalEnvironment >> on this large-memory machine, with parallelism set to one: >> >> Configuration conf = new Configuration(); >> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc >> alEnvironmentWithWebUI(conf); >> ExecutionEnvironment env = lenv; >> env.getConfig().enableSysoutLogging().enableClosureCleaner() >> .enableObjectReuse(); >> env.setParallelism(1); >> >> This initializes the execution environment for a series of sequential >> jobs (any data dependency between jobs is flushed to disk on job *i *and >> read back from disk into a DataSet in job *i + 1*). >> To reiterate, I am not launching a Flink cluster, I am just executing in >> local mode from a code base compiled with Maven. >> >> I have tested this program via mvn exec:exec with different values of >> memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the >> result is always the same: the process' memory fills up completely and then >> the process' CPU usage drops to 0%. >> This is strange because if it was lack of memory, I would expect an >> OutOfMemoryError. >> >> I have debugged with IntelliJ IDEA and obtained thread dumps from >> different executions, and realized quite a few operator threads are stuck >> on java.lang.Thread.State: WAITING. >> >> There are four major threads that I find to be in this waiting state. >> The thread dumps I obtained show me where the wait calls originated: >> >> >> >> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> >> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" >> prio=5 tid=0xd93 nid=NA waiting >> java.lang.Thread.State: WAITING >> at java.lang.Object.wait(Object.java:-1) >> at java.lang.Object.wait(Object.java:502) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >> dReader.getNextRecord(AbstractRecordReader.java:86) >> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >> Reader.next(MutableRecordReader.java:47) >> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >> ReaderIterator.java:59) >> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >> beIterator.next(MutableHashTable.java:1929) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >> cessProbeIter(MutableHashTable.java:505) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >> tRecord(MutableHashTable.java:666) >> at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa >> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte >> rator.java:122) >> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >> .java:221) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >> ava:503) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:368) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >> at java.lang.Thread.run(Thread.java:748) >> >> >> *Number 2:* >> >> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" >> prio=5 tid=0xd8e nid=NA waiting >> java.lang.Thread.State: WAITING >> at java.lang.Object.wait(Object.java:-1) >> at java.lang.Object.wait(Object.java:502) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >> dReader.getNextRecord(AbstractRecordReader.java:86) >> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >> Reader.next(MutableRecordReader.java:47) >> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >> ReaderIterator.java:59) >> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >> beIterator.next(MutableHashTable.java:1929) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >> cessProbeIter(MutableHashTable.java:505) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >> tRecord(MutableHashTable.java:666) >> at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa >> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte >> rator.java:122) >> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >> .java:221) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >> ava:503) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:368) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >> at java.lang.Thread.run(Thread.java:748) >> >> *Number 3:* >> >> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 >> tid=0xd75 nid=NA waiting >> java.lang.Thread.State: WAITING >> at java.lang.Object.wait(Object.java:-1) >> at java.lang.Object.wait(Object.java:502) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >> at org.apache.flink.runtime.io.network.partition.consumer.Singl >> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >> dReader.getNextRecord(AbstractRecordReader.java:86) >> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >> Reader.next(MutableRecordReader.java:47) >> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >> ReaderIterator.java:59) >> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >> beIterator.next(MutableHashTable.java:1929) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >> cessProbeIter(MutableHashTable.java:505) >> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >> tRecord(MutableHashTable.java:666) >> at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas >> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123) >> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >> .java:221) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >> ava:503) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:368) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >> at java.lang.Thread.run(Thread.java:748) >> >> *Number 4:* >> >> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting >> java.lang.Thread.State: WAITING >> at sun.misc.Unsafe.park(Unsafe.java:-1) >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java >> :175) >> at java.util.concurrent.CompletableFuture$Signaller.block(Compl >> etableFuture.java:1693) >> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool. >> java:3323) >> at java.util.concurrent.CompletableFuture.waitingGet(Completabl >> eFuture.java:1729) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture >> .java:1895) >> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB >> locking(MiniCluster.java:519) >> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu >> tor.java:231) >> - locked <0x23eb> (a java.lang.Object) >> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi >> ronment.java:91) >> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >> tionEnvironment.java:815) >> at org.apache.flink.api.java.DataSet.count(DataSet.java:398) >> at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M >> isc.java:103) >> at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp >> le.java:492) >> at my.package.algorithm.Sample.run(Sample.java:291). >> at java.lang.Thread.run(Thread.java:748) >> >> While I realize these dumps on their own may not be helpful, they at >> least (as far as I know) indicate that the threads are all waiting on >> something. >> But if it was resource scarcity I believe the program would terminate >> with an exception. >> And if it was garbage collection activity, I believe the JVM process >> would not be at 0% CPU usage. >> >> *Note: *I realize I didn't provide the user-code code that generates the >> execution plan for Flink which led to the contexts in which the threads are >> waiting, but I hope it may not be necessary. >> My problem now is that I am unsure on how to proceed to further debug >> this issue: >> - The assigned memory is fully used, but there are no exceptions about >> lack of memory. >> - The CPU usage is at 0% and all threads are all in a waiting state, but >> I don't understand what signal they're waiting for exactly. >> >> Hoping anyone might be able to give me a hint. >> >> Thank you very much for your time. >> >> Best regards, >> >> Miguel E. Coimbra >> >> >