Hi all,

thanks for the feedback. For the moment, I hope I resolved the problem by
compressing the string into a bite[] using a custom implementation of Value
interface and LZ4 algorithm. I have a little overhead on the processing of
some steps, but it should reduce network traffic and required temporary
space on disk.

I think the problem is due to the two joins moving around quite a bit of
data. Essentially I join twice something like 230 million tuples with a
dataset of 9.2 million entries (~80GB). Compression seems to be working
fine so far, even though I did not reach the critical point yet. I'll keep
you posted to let you know whether this workaround solved the problem.

I applied a double join as an alternative to the repeat 230M*2 single gets
on HBase. Even though this allowed to completed the process in about 11h.

thanks a lot to everyone again.

saluti,
Stefano






2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <[email protected]>:

> I think I can answer on behalf of Stefano that is busy right now..the job
> failed because on the job manager (that is also a task manager) the temp
> folder was full.
> We would like to understand how big should be the temp directory..which
> parameters should we consider to make that computation?
>
>
> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <[email protected]> wrote:
>
>> The task managers log the temporary directories at start up. Can you have
>> a look there and verify that you configured the temporary directories
>> correctly?
>>
>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <[email protected]> wrote:
>>
>>> Hi!
>>>
>>> That exception means that one of the directories is full. If you have
>>> several temp directories on different disks, you can add them all to the
>>> config and the temp files will be rotated across the disks.
>>>
>>> The exception may come once the first temp directory is full. For
>>> example, if you have 4 temp dirs (where 1 is rather full while the others
>>> have a lot of space), it may be that one temp file on the full directory
>>> grows large and exceeds the space, while the other directories have plenty
>>> of space.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think Flink is deleting its temporary files.
>>>>
>>>> Is the temp. path set to the SSD on each machine?
>>>> What is the size of the two data sets your are joining? Your cluster
>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>> Maybe only the temp directory of one node is full?
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <[email protected]> wrote:
>>>>
>>>>> Hey Stefano,
>>>>>
>>>>> I would wait for Stephan's take on this, but with caught IOExceptions
>>>>> the hash table should properly clean up after itself and delete the file.
>>>>>
>>>>> Can you still reproduce this problem for your use case?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>>
>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> a quite long process failed due to this No Space Left on Device
>>>>>> exception, but the machine disk is not full at all.
>>>>>>
>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>> none                5120        0      5120   0% /run/lock
>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>> none              102400        0    102400   0% /run/user
>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>> /run/cloudera-scm-agent/process
>>>>>>
>>>>>> Is it possible that the temporary files were deleted 'after the
>>>>>> problem'? I read so, but there was no confirmation. However, it is a 
>>>>>> 256SSD
>>>>>> disk. Each of the 6 nodes has it.
>>>>>>
>>>>>> Here is the stack trace:
>>>>>>
>>>>>> 16:37:59,581 ERROR
>>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>>> task code:  CHAIN Join
>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>> -> Filter
>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>> -> Map
>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) 
>>>>>> ->
>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>> (4/28)
>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>     at
>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>     at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>     at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>     at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>     at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>     at
>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to