JAXB and Serialization are necessary for my business logic. I store data as byte[] which are plain serialization of XML String. At every read I have to rebuild the objects using jaxb.
Kryo in Flink will allow to manage more easily user defined objects, I guess. saluti, Stefano 2014-12-04 12:41 GMT+01:00 Stephan Ewen <[email protected]>: > Hi Stefano! > > Good to hear that it is working for you! > > Just a heads up: Flink is not using JAXB or any other Java Serialization > for its data exchange, only to deploy functions into the cluster (which is > usually very fast). When we send records around, we have a special > serialization stack that is absolutely competitive with Kryo on > serialization speed. We are thinking of using Kryo, though, to deploy > functions into the cluster in the future, to work around some of the > constraints that the java serialization has. > > Greetings, > Stephan > > > On Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli <[email protected]> > wrote: > >> The process was completed in about 6h45m, much less than the previous >> one. The longest time is still taken by the 'blocking part'. I guess we >> could just increase redundancy of SolrCloud indexes, and we could reach >> amazing performances. Furthermore, we did not apply any 'key >> transformation' (reversing or generating Long as ID), so we have further >> margin for improvements. Furthermore, I have the feeling that relying on >> Kryo serialization to build the POJOs rather than old-school JAXB >> marshalling/unmarshalling would also give quite a boost as we repeat the >> operation at least 250M times. :-) >> >> Thanks a lot to everyone. Flink is making possible effective >> deduplication on a very heterogeneous dataset of about 10M entries within >> hours in a cluster of 6 cheap hardware nodes. :-) >> >> saluti, >> Stefano >> >> 2014-12-03 18:31 GMT+01:00 Stefano Bortoli <[email protected]>: >> >>> Hi all, >>> >>> thanks for the feedback. For the moment, I hope I resolved the problem >>> by compressing the string into a bite[] using a custom implementation of >>> Value interface and LZ4 algorithm. I have a little overhead on the >>> processing of some steps, but it should reduce network traffic and required >>> temporary space on disk. >>> >>> I think the problem is due to the two joins moving around quite a bit of >>> data. Essentially I join twice something like 230 million tuples with a >>> dataset of 9.2 million entries (~80GB). Compression seems to be working >>> fine so far, even though I did not reach the critical point yet. I'll keep >>> you posted to let you know whether this workaround solved the problem. >>> >>> I applied a double join as an alternative to the repeat 230M*2 single >>> gets on HBase. Even though this allowed to completed the process in about >>> 11h. >>> >>> thanks a lot to everyone again. >>> >>> saluti, >>> Stefano >>> >>> >>> >>> >>> >>> >>> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <[email protected]>: >>> >>>> I think I can answer on behalf of Stefano that is busy right now..the >>>> job failed because on the job manager (that is also a task manager) the >>>> temp folder was full. >>>> We would like to understand how big should be the temp directory..which >>>> parameters should we consider to make that computation? >>>> >>>> >>>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <[email protected]> wrote: >>>> >>>>> The task managers log the temporary directories at start up. Can you >>>>> have a look there and verify that you configured the temporary directories >>>>> correctly? >>>>> >>>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <[email protected]> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> That exception means that one of the directories is full. If you have >>>>>> several temp directories on different disks, you can add them all to the >>>>>> config and the temp files will be rotated across the disks. >>>>>> >>>>>> The exception may come once the first temp directory is full. For >>>>>> example, if you have 4 temp dirs (where 1 is rather full while the others >>>>>> have a lot of space), it may be that one temp file on the full directory >>>>>> grows large and exceeds the space, while the other directories have >>>>>> plenty >>>>>> of space. >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I think Flink is deleting its temporary files. >>>>>>> >>>>>>> Is the temp. path set to the SSD on each machine? >>>>>>> What is the size of the two data sets your are joining? Your cluster >>>>>>> has 6*256GB = 1.5 TB of temporary disk space. >>>>>>> Maybe only the temp directory of one node is full? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <[email protected]> wrote: >>>>>>> >>>>>>>> Hey Stefano, >>>>>>>> >>>>>>>> I would wait for Stephan's take on this, but with caught >>>>>>>> IOExceptions the hash table should properly clean up after itself and >>>>>>>> delete the file. >>>>>>>> >>>>>>>> Can you still reproduce this problem for your use case? >>>>>>>> >>>>>>>> – Ufuk >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi guys, >>>>>>>>> >>>>>>>>> a quite long process failed due to this No Space Left on Device >>>>>>>>> exception, but the machine disk is not full at all. >>>>>>>>> >>>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df >>>>>>>>> Filesystem 1K-blocks Used Available Use% Mounted on >>>>>>>>> /dev/sdb2 223302236 22819504 189116588 11% / >>>>>>>>> none 4 0 4 0% /sys/fs/cgroup >>>>>>>>> udev 8156864 4 8156860 1% /dev >>>>>>>>> tmpfs 1633520 524 1632996 1% /run >>>>>>>>> none 5120 0 5120 0% /run/lock >>>>>>>>> none 8167584 0 8167584 0% /run/shm >>>>>>>>> none 102400 0 102400 0% /run/user >>>>>>>>> /dev/sdb1 523248 3428 519820 1% /boot/efi >>>>>>>>> /dev/sda1 961302560 2218352 910229748 1% /media/data >>>>>>>>> cm_processes 8167584 12116 8155468 1% >>>>>>>>> /run/cloudera-scm-agent/process >>>>>>>>> >>>>>>>>> Is it possible that the temporary files were deleted 'after the >>>>>>>>> problem'? I read so, but there was no confirmation. However, it is a >>>>>>>>> 256SSD >>>>>>>>> disk. Each of the 6 nodes has it. >>>>>>>>> >>>>>>>>> Here is the stack trace: >>>>>>>>> >>>>>>>>> 16:37:59,581 ERROR >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask - Error >>>>>>>>> in >>>>>>>>> task code: CHAIN Join >>>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates) >>>>>>>>> -> Filter >>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch) >>>>>>>>> -> Map >>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) >>>>>>>>> -> >>>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) >>>>>>>>> (4/28) >>>>>>>>> java.io.IOException: The channel is erroneous. >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173) >>>>>>>>> at >>>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> Caused by: java.io.IOException: No space left on device >>>>>>>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>>>>>>> at >>>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) >>>>>>>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>>>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>>>>>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636) >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
