ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR <https://github.com/apache/flink/pull/5814> that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened. If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26-- http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused. Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.

It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
*<dependency>
     <groupId>org.apache.flink</groupId>
 <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>**
*

Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <mailto:miguel.e.coim...@ist.utl.pt>

On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    The thing with createLocalEnvironmentWithWebUI is that it requires
    flink-runtime-web to be on the classpath, which is rarely the
    class when running things in the IDE.
    It should work fine in the IDE if you add it as a dependency to
    your project. This should've been logged as a warning.

    Chaining is unrelated to this issue as join operators are never
    chained to one another.
    Lambda functions are also not the issue, if they were the job
    would fail much earlier.

    It is reasonable that T3 is blocked if T1 is blocked. T1 gets no
    input hence produces no output, which now also blocks T3.

    There are multiple possible explanations i can come up with:
    * the preceding operators are blocked on something or /really /slow
    * the preceding operators are actually finished, but aren't
    shutting down due to an implementation error
    * a deadlock in Flink's join logic
    * a deadlock in Flink's network stack

    For the first 2 we will have to consult the UI or logs. You said
    you were dumping the input DataSets into files, but were they
    actually complete?

    A deadlock in the network stack should appear as all existing
    operator threads being blocked.
    We can probably rule out a problem with the join logic by removing
    the second join and trying again.



    On 16.04.2018 03:10, Miguel Coimbra wrote:
    Hello,

    It would seem that the function which is supposed to launch local
    mode with the web front-end doesn't launch the front-end at all...
    This function seems not to be doing what it is supposed to do, if
    I'm not mistaken:

    LocalEnvironment lenv = (LocalEnvironment)
    ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

    Regarding the preceding operators, the thread dumps I got were
    pointing to a specific set of operations over DataSet instances
    that were passed into my function.
    Below I show the code segment and put the lines where threads are
    waiting in *bold*:

    public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
    Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>>
    vertices) {
        return vertices
                .joinWithHuge(originalGraph.getEdges())
                .where(0).equalTo(0)
    *            .with((source, edge) -> edge)* *// Thread 1 is
    blocked here*
                .returns(originalGraph.getEdges().getType())
                .join(vertices)
                .where(1).equalTo(0)
    *            .with((e, v) -> e) // Thread 3 is blocked here*
                .returns(originalGraph.getEdges().getType())
                .distinct(0, 1);
    }

    Note: the edges inside the graph originalGraph edge DataSet are
    much greater in number than the elements of the vertices DataSet,
    so I believe that function is being used correctly.

    I will try testing with remote (cluster) mode to have access to
    the web front-end, but I have some questions for now:

    - The fact that they are blocked in different ​JoinOperator
    instances that are chained, is this a result of Flink's default
    pipeline mechanism?
    - Could there be a problem stemming from the fact they are both
    waiting on lambdas?
    - I have tried dumping both DataSet variables originalGraph and
    vertices into files (the ones being used in this code), and they
    produced correct values (non-empty files), so I don't have a clue
    what the threads inside Flink's runtime are waiting on.

    ​Thanks for the help so far Chesnay.​


    Miguel E. Coimbra
    Email: miguel.e.coim...@gmail.com
    <mailto:miguel.e.coim...@ist.utl.pt>

    ---------- Forwarded message ----------

        From: Chesnay Schepler <ches...@apache.org
        <mailto:ches...@apache.org>>
        To: user@flink.apache.org <mailto:user@flink.apache.org>
        Cc:
        Bcc:
        Date: Sun, 15 Apr 2018 18:54:33 +0200
        Subject: Re: Unsure how to further debug - operator threads
        stuck on java.lang.Thread.State: WAITING
        Hello,

        Thread #1-3 are waiting for input, Thread #4 is waiting for
        the job to finish.

        To further debug this I would look into what the preceding
        operators are doing, whether they are blocked on something or
        are emitting records (which you can check in the UI/metrics).

        On 15.04.2018 18:40, Miguel Coimbra wrote:
        ​Hello,

        I am running into a situation where the Flink threads
        responsible for my operator execution are all stuck on
        WAITING mode.
        Before anything else, this is my machine's spec:

        Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @
        2.13GHz GenuineIntel GNU/Linux
        256 GB RAM

        I am running in local mode on a machine with a considerable
        amount of memory, so perhaps that may be triggering some
        execution edge-case?

        Moving on, this is my Java:

        openjdk version "1.8.0_151"
        OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
        OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

        Getting back to the problem: I am currently using Flink
        1.5-SNAPSHOT with LocalEnvironment on this large-memory
        machine, with parallelism set to one:

        Configuration conf = new Configuration();
        LocalEnvironment lenv = (LocalEnvironment)
        ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        ExecutionEnvironment env = lenv;
        
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
        env.setParallelism(1);

        This initializes the execution environment for a series of
        sequential jobs (any data dependency between jobs is flushed
        to disk on job /i /and read back from disk into a DataSet in
        job /i + 1/).
        To reiterate, I am not launching a Flink cluster, I am just
        executing in local mode from a code base compiled with Maven.

        I have tested this program via mvn exec:exec with different
        values of memory (from -Xmx20000m to -Xmx120000m, from 20GB
        to 120GB) and the result is always the same: the process'
        memory fills up completely and then the process' CPU usage
        drops to 0%.
        This is strange because if it was lack of memory, I would
        expect an OutOfMemoryError.

        I have debugged with IntelliJ IDEA and obtained thread dumps
        from different executions, and realized quite a few operator
        threads are stuck on java.lang.Thread.State: WAITING.

        There are four major threads that I find to be in this
        waiting state.
        The thread dumps I obtained show me where the wait calls
        originated:

        *Number 1:

        *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
        Combine (Distinct at selectEdges(GraphUtils.java:330))
        (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
          java.lang.Thread.State: WAITING
              at java.lang.Object.wait(Object.java:-1)
              at java.lang.Object.wait(Object.java:502)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
              at
        
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
              at
        
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
              at
        org.apache.flink.runtime.operators.hash.MutableHashTable.pro
        
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
              at
        
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
              at
        org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
              at
        org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
              at
        org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
              at
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
              at java.lang.Thread.run(Thread.java:748)


        *Number 2:*

        "Join (Join at summaryGraph(SummaryGraphBuilder.java:92))
        (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
          java.lang.Thread.State: WAITING
              at java.lang.Object.wait(Object.java:-1)
              at java.lang.Object.wait(Object.java:502)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
              at
        
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
              at
        
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
              at
        org.apache.flink.runtime.operators.hash.MutableHashTable.pro
        
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
              at
        
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
              at
        org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
              at
        org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
              at
        org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
              at
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
              at java.lang.Thread.run(Thread.java:748)

        *Number 3:*

        "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118"
        prio=5 tid=0xd75 nid=NA waiting
          java.lang.Thread.State: WAITING
              at java.lang.Object.wait(Object.java:-1)
              at java.lang.Object.wait(Object.java:502)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
              at org.apache.flink.runtime.io
        
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
              at
        
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
              at
        
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
              at
        org.apache.flink.runtime.operators.hash.MutableHashTable.pro
        
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
              at
        
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
              at
        
org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
              at
        org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
              at
        org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
              at
        org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
              at
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
              at java.lang.Thread.run(Thread.java:748)

        *Number 4:*

        "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
          java.lang.Thread.State: WAITING
              at sun.misc.Unsafe.park(Unsafe.java:-1)
              at
        java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at
        
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
              at
        java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
              at
        
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
              at
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
              at
        
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
              at
        
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
              - locked <0x23eb> (a java.lang.Object)
              at
        
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
              at
        
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
              at
        org.apache.flink.api.java.DataSet.count(DataSet.java:398)
              at
        
my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
              at my.package.algorithm.Sample.co
        
<http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492)
              at my.package.algorithm.Sample.ru
        <http://my.package.algorithm.Sample.ru>n(Sample.java:291).
              at java.lang.Thread.run(Thread.java:748)

        While I realize these dumps on their own may not be helpful,
        they at least (as far as I know) indicate that the threads
        are all waiting on something.
        But if it was resource scarcity I believe the program would
        terminate with an exception.
        And if it was garbage collection activity, I believe the JVM
        process would not be at 0% CPU usage.

        *Note: *I realize I didn't provide the user-code code that
        generates the execution plan for Flink which led to the
        contexts in which the threads are waiting, but I hope it may
        not be necessary.
        My problem now is that I am unsure on how to proceed to
        further debug this issue:
        - The assigned memory is fully used, but there are no
        exceptions about lack of memory.
        - The CPU usage is at 0% and all threads are all in a
        waiting state, but I don't understand what signal they're
        waiting for exactly.

        Hoping anyone might be able to give me a hint.

        Thank you very much for your time.

        Best regards,

        Miguel E. Coimbra




Reply via email to