Hello,

implementing collect() in python is not that trivial and the gain is questionable. There is an inherent size limit (think 10mb), and it is
a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a separate job and save the result in a file. Note that right now the Pyhton API can't execute multiple jobs from the same file; we would need some modifications
in the PythonPlanBinder to allow this.

Regards,
Chesnay

On 20.11.2016 23:54, Geoffrey Mon wrote:
Hello,

I know that the reuse of the data set in my plan is causing the problem (after one dictionary atom is learned using the data set "S", "S" is updated for use with the next dictionary atom). When I comment out the line updating the data set "S", I have no problem and the plan processing phase takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of transformations much more complicated and forces the optimizer to do much more work, since for example the final value of "S" depends on all previous operations combined. Is there a way to replace the for loop in my plan so that I don't cause this complication and so that memory usage is manageable? I considered making "S" an iterative data set, but I need to save each dictionary atom to a file, and I wouldn't be able to do that if "S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom and then make the new "S" directly from these values. This however would require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

    Hi Ufuk,

    The master instance of the cluster was also a m3.xlarge instance
    with 15 GB RAM, which I would've expected to be enough. I have
    gotten the program to run successfully on a personal virtual
    cluster where each node has 8 GB RAM and where the master node was
    also a worker node, so the problem appears to have something to do
    with YARN's memory behavior (such as on EMR).

    Nevertheless, it would probably be a good idea to modify my code
    to reduce its memory usage. When running my code on my local
    cluster, performance was probably bottlenecked.

    The job does use a for loop to run the core operations for a
    specific number of times, specified as a command line parameter.
    If it helps, here is my code:
    Python:
    https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260
    is the core for loop)
    Java:
    
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
 (L120
    is the core for loop)
    I would expect the join operations to be a big cause of the
    excessive memory usage.

    Thanks!

    Geoffrey


    On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <u...@apache.org
    <mailto:u...@apache.org>> wrote:

        The Python API is in alpha state currently, so we would have
        to check if it is related specifically to that. Looping in
        Chesnay who worked on that.

        The JVM GC error happens on the client side as that's where
        the optimizer runs. How much memory does the client submitting
        the job have?

        How do you compose the job? Do you have nested loops, e.g.
        for() { ... bulk iteration Flink program }?

        – Ufuk

        On 14 November 2016 at 08:02:26, Geoffrey Mon
        (geof...@gmail.com <mailto:geof...@gmail.com>) wrote:
        > Hello all,
        >
        > I have a pretty complicated plan file using the Flink Python
        API running on
        > a AWS EMR cluster of m3.xlarge instances using YARN. The
        plan is for a
        > dictionary learning algorithm and has to run a sequence of
        operations many
        > times; each sequence involves bulk iterations with join
        operations and
        > other more intensive operations, and depends on the results
        of the previous
        > sequence. I have found that when the number of times to run
        this sequence
        > of operations is high (e.g. 20) I get this exception:
        >
        > Uncaught error from thread
        > [flink-akka.remote.default-remote-dispatcher-7] shutting
        down JVM
        > since 'akka.jvm-exit-on-fatal-error' is enabled for
        ActorSystem[flink]
        > java.lang.StackOverflowError
        > at
        
java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
        > at
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
        > at
        
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        > at
        java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        > at
        
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        > at
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        > at
        
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        > ..............................
        >
        > I assume this problem is caused by having to send too many
        serialized
        > operations between Java and Python. When using a Java
        implementation of the
        > same operations, I also get:
        >
        > java.lang.OutOfMemoryError: GC overhead limit exceeded
        > at
        
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
        > at
        
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
        > at
        
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
        > at
        
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
        > at
        
org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
        > at
        
org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
        > ............
        >
        > The problem seems to caused by YARN's handling of memory,
        because I have
        > gotten the same Python implementation to work on a smaller,
        local virtual
        > cluster that is not using YARN, even though my local cluster
        has far fewer
        > computing resources than the 15 GB RAM m3.xlarge AWS
        instances that EMR is
        > using. After the YARN job has failed, sometimes a python
        process is left on
        > the cluster using up most of the RAM.
        >
        > How can I solve this issue? I am unsure of how to reduce the
        number of
        > operations while keeping the same functionality.
        >
        > Thanks,
        > Geoffrey
        >


Reply via email to