Hi all, thanks for the feedback. For the moment, I hope I resolved the problem by compressing the string into a bite[] using a custom implementation of Value interface and LZ4 algorithm. I have a little overhead on the processing of some steps, but it should reduce network traffic and required temporary space on disk.
I think the problem is due to the two joins moving around quite a bit of data. Essentially I join twice something like 230 million tuples with a dataset of 9.2 million entries (~80GB). Compression seems to be working fine so far, even though I did not reach the critical point yet. I'll keep you posted to let you know whether this workaround solved the problem. I applied a double join as an alternative to the repeat 230M*2 single gets on HBase. Even though this allowed to completed the process in about 11h. thanks a lot to everyone again. saluti, Stefano 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <[email protected]>: > I think I can answer on behalf of Stefano that is busy right now..the job > failed because on the job manager (that is also a task manager) the temp > folder was full. > We would like to understand how big should be the temp directory..which > parameters should we consider to make that computation? > > > On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <[email protected]> wrote: > >> The task managers log the temporary directories at start up. Can you have >> a look there and verify that you configured the temporary directories >> correctly? >> >> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <[email protected]> wrote: >> >>> Hi! >>> >>> That exception means that one of the directories is full. If you have >>> several temp directories on different disks, you can add them all to the >>> config and the temp files will be rotated across the disks. >>> >>> The exception may come once the first temp directory is full. For >>> example, if you have 4 temp dirs (where 1 is rather full while the others >>> have a lot of space), it may be that one temp file on the full directory >>> grows large and exceeds the space, while the other directories have plenty >>> of space. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <[email protected]> >>> wrote: >>> >>>> Hi, >>>> >>>> I think Flink is deleting its temporary files. >>>> >>>> Is the temp. path set to the SSD on each machine? >>>> What is the size of the two data sets your are joining? Your cluster >>>> has 6*256GB = 1.5 TB of temporary disk space. >>>> Maybe only the temp directory of one node is full? >>>> >>>> >>>> >>>> >>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <[email protected]> wrote: >>>> >>>>> Hey Stefano, >>>>> >>>>> I would wait for Stephan's take on this, but with caught IOExceptions >>>>> the hash table should properly clean up after itself and delete the file. >>>>> >>>>> Can you still reproduce this problem for your use case? >>>>> >>>>> – Ufuk >>>>> >>>>> >>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi guys, >>>>>> >>>>>> a quite long process failed due to this No Space Left on Device >>>>>> exception, but the machine disk is not full at all. >>>>>> >>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df >>>>>> Filesystem 1K-blocks Used Available Use% Mounted on >>>>>> /dev/sdb2 223302236 22819504 189116588 11% / >>>>>> none 4 0 4 0% /sys/fs/cgroup >>>>>> udev 8156864 4 8156860 1% /dev >>>>>> tmpfs 1633520 524 1632996 1% /run >>>>>> none 5120 0 5120 0% /run/lock >>>>>> none 8167584 0 8167584 0% /run/shm >>>>>> none 102400 0 102400 0% /run/user >>>>>> /dev/sdb1 523248 3428 519820 1% /boot/efi >>>>>> /dev/sda1 961302560 2218352 910229748 1% /media/data >>>>>> cm_processes 8167584 12116 8155468 1% >>>>>> /run/cloudera-scm-agent/process >>>>>> >>>>>> Is it possible that the temporary files were deleted 'after the >>>>>> problem'? I read so, but there was no confirmation. However, it is a >>>>>> 256SSD >>>>>> disk. Each of the 6 nodes has it. >>>>>> >>>>>> Here is the stack trace: >>>>>> >>>>>> 16:37:59,581 ERROR >>>>>> org.apache.flink.runtime.operators.RegularPactTask - Error in >>>>>> task code: CHAIN Join >>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates) >>>>>> -> Filter >>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch) >>>>>> -> Map >>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) >>>>>> -> >>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) >>>>>> (4/28) >>>>>> java.io.IOException: The channel is erroneous. >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204) >>>>>> at >>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) >>>>>> at >>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) >>>>>> at >>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173) >>>>>> at >>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) >>>>>> at >>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269) >>>>>> at >>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474) >>>>>> at >>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537) >>>>>> at >>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106) >>>>>> at >>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359) >>>>>> at >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.io.IOException: No space left on device >>>>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>>>> at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) >>>>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636) >>>>>> >>>>>> >>>>> >>>> >>> >> >
