Cross is a quadratic operation. As such, it produces very large results on
moderate inputs, which can easily exceed memory and disk space, if the
subsequent operation requires to gather all data (such as for the sort in
your case).

If you use on both inputs 10 MB of 100 byte elements (100K elements per
input), you end up with 10 billion elements after the cross, which is 1 TB
in size (assuming the result elements are also 100 bytes).

This is an inherent issue of using a quadratic operation with data that is
to large to be handled by a quadratic operation. Not much anyone can do
about this.

Try and see if you can replace the Cross operation by something else (Join,
CoGroup) or whether you can at least filter aggressively after the Cross
before the next operation.


On Mon, Jun 15, 2015 at 2:18 PM, Mihail Vieru <vi...@informatik.hu-berlin.de
> wrote:

>  Hi,
>
> I get the following *"No space left on device" IOException* when using
> the following Cross operator.
> The inputs for the operator are each just *10MB* in size (same input for
> IN1 and IN2; 1000 tuples) and I get the exception after Flink manages to
> fill *50GB* of SSD space and the partition becomes full.
>
> I have found a similar problem in the mailing list here:
>
> https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E
>
> As I currently don't have any more free file system space left, specifying
> other temporary folders for Flink is not an option.
> Any ideas on what could help?
>
> I'm using the latest 0.9-SNAPSHOT and run the job in a local execution
> environment.
>
> Best,
> Mihail
>
>
> *java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an error:
> Error obtaining the sorted input: Thread 'SortMerger spilling thread'
> terminated due to an exception: No space left on device*
> *    at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)*
> *    at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)*
> *    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)*
> *    at java.lang.Thread.run(Thread.java:745)*
> *Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: No
> space left on device*
> *    at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)*
> *    at
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)*
> *    at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)*
> *    at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)*
> *    ... 3 more*
> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: No space left on device*
> *    at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)*
> *Caused by: java.io.IOException: No space left on device*
> *    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)*
> *    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)*
> *    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)*
> *    at sun.nio.ch.IOUtil.write(IOUtil.java:65)*
> *    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)*
> *    at
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)*
>
>
>
>
> *    at
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471)
> **    public static class crossKAPSPFilter implements
> CrossFunction<Vertex<Integer, Tuple2<Integer[],String>>, Vertex<Integer,
> Tuple2<Integer[],String>>, *
> *        Tuple2<Integer,String>>  {*
>
> *            @Override*
> *            public Tuple2<Integer, String> cross(*
> *                    Vertex<Integer, Tuple2<Integer[], String>> vertex1,*
> *                    Vertex<Integer, Tuple2<Integer[], String>> vertex2)
> throws Exception {*
>
> *                int vertexIdFirst = vertex1.f0;*
> *                int vertexIdSecond = vertex2.f0;*
> *                Integer[] vertexDistanceVectorFirst = vertex1.f1.f0;*
> *                Integer[] vertexDistanceVectorSecond = vertex2.f1.f0;*
>
> *                if( **    vertexIdFirst != vertexIdSecond*
> *                        && vertexDistanceVectorFirst[vertexIdSecond] <=
> grapDistThreshold*
> *                        && vertexDistanceVectorSecond[vertexIdFirst] <=
> grapDistThreshold **) {*
> *                    return new Tuple2<Integer, String>(vertex1.f0,
> vertex1.f1.f1);*
> *                }*
> *                else return null;*
> *            }*
> *    }*
>
>

Reply via email to