Hey Martin, I'm still on it. I have switched to analyzing the flink-runtime tests, as I observe similar divergence there. I'm not sure how long it'll take, but if I find something I'll make sure to let you all know :)
Robert On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <martineden...@gmail.com> wrote: > Hi Robert, > > Any updates on the below for the community? > > Thanks, > M > > On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi Ufuk, thanks for coming back to me on this. >> >> The records are 100 bytes in size, the benchmark being TeraSort, so that >> should not be an issue. I have played around with the input size, and here >> are my observations: >> >> 128 GiB input: 0 Spilling in Flink. >> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of >> writes), and my instrumentation covers all of it. >> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it. >> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it. >> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it. >> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it. >> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it. >> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it. >> >> So regardless of how well configured my system is and spilling is even >> necessary, it seems that with larger spilling amounts, the way the data is >> spilled changes (and I start missing larger and larger portions of I/O >> until almost 100%). >> Now since I have written the instrumentation myself, I cannot guarantee >> that it is flawless and I might have missed something. >> I'm currently looking into how the file channels are being accessed in >> parallel by multiple threads, which I cover as well and my tests verify it, >> but maybe there are special access patterns here. >> >> Robert >> >> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <u...@apache.org> wrote: >> >>> Hey Robert, >>> >>> for batch that should cover the relevant spilling code. If the records >>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill >>> incoming records as well. But that should be covered by the >>> FileChannel instrumentation as well? >>> >>> – Ufuk >>> >>> >>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke >>> <ro.schmid...@gmail.com> wrote: >>> > Hi, >>> > >>> > I have already looked at the UnilateralSortMerger, concluding that all >>> I/O >>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which >>> in >>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. >>> Are >>> > there more interaction points between Flink and the underlying file >>> system >>> > that I might want to consider? >>> > >>> > Thanks! >>> > Robert >>> > >>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <ykt...@gmail.com> wrote: >>> >> >>> >> Hi, >>> >> >>> >> You probably want check out UnilateralSortMerger.java, this is the >>> class >>> >> which is responsible for external sort for flink. Here is a short >>> >> description for how it works: there are totally 3 threads working >>> together, >>> >> one for reading, one for sorting partial data in memory, and the last >>> one is >>> >> responsible for spilling. Flink will first figure out how many memory >>> it can >>> >> use during the in-memory sort, and manage them as MemorySegments. >>> Once these >>> >> memory runs out, the sorting thread will take over these memory and >>> do the >>> >> in-memory sorting (For more details about in-memory sorting, you can >>> see >>> >> NormalizedKeySorter). After this, the spilling thread will write this >>> sorted >>> >> data to disk and make these memory available again for reading. This >>> will >>> >> repeated until all data has been processed. >>> >> Normally, the data will be read twice (one from source, and one from >>> disk) >>> >> and write once, but if you spilled too much files, flink will first >>> merge >>> >> some all the files and make sure the last merge step will not exceed >>> some >>> >> limit (default 128). Hope this can help you. >>> >> >>> >> Best, >>> >> Kurt >>> >> >>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke < >>> ro.schmid...@gmail.com> >>> >> wrote: >>> >>> >>> >>> Hi, >>> >>> >>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to >>> know >>> >>> when/how Flink goes to disk. Let me give an introduction of what I >>> have done >>> >>> so far. >>> >>> >>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort >>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node >>> cluster, each >>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte >>> of disk. >>> >>> I'm using YARN and HDFS. The underlying file system is XFS. >>> >>> >>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to >>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS >>> counters >>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and >>> 3.2 TiB >>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for >>> TeraGen, 1 >>> >>> for TeraSort) and 1 TiB of reads (during TeraSort). >>> >>> >>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS >>> >>> wrapper that logs file system statistics for each call to >>> hdfs://..., such >>> >>> as start time/end time, no. of bytes read/written etc. I can plot >>> these >>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes >>> to >>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of >>> writes >>> >>> to hdfs://... So far, so good. >>> >>> >>> >>> Now this still did not explain the disk I/O, so I added bytecode >>> >>> instrumentation to a range of Java classes, like FileIn/OutputStream, >>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for >>> memory >>> >>> mapped files etc., and have the same statistics: start/end of a read >>> >>> from/write to disk, no. of bytes involved and such. I can plot these >>> numbers >>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during >>> TeraGen >>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort >>> >>> (expected). >>> >>> >>> >>> Sorry for the enormous introduction, but now there's finally the >>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of >>> data >>> >>> each during TeraSort. I'm suspecting there is some sort of spilling >>> >>> involved, potentially because I have not done the setup properly. >>> But that >>> >>> is not the crucial point: my statistics give a total of 3 TiB of >>> writes to >>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS >>> counters >>> >>> from above. However, my statistics only give 2 TiB of reads from >>> disk (1 TiB >>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads >>> from disk >>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm >>> not >>> >>> missing any data, meaning my statistics agree with XFS for TeraSort >>> on >>> >>> Hadoop, which is why I suspect there are some cases where Flink goes >>> to disk >>> >>> without me noticing it. >>> >>> >>> >>> Therefore here finally the question: in which cases does Flink go to >>> >>> disk, and how does it do so (meaning precisely which Java classes are >>> >>> involved, so I can check my bytecode instrumentation)? This would >>> also >>> >>> include any kind of resource distribution via HDFS/YARN I guess >>> (like JAR >>> >>> files and I don't know what). Seeing that I'm missing an amount of >>> data >>> >>> equal to the size of my input set I'd suspect there must be some >>> sort of >>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is >>> also some >>> >>> sort of remote I/O involved via sockets or so that I'm missing. >>> >>> >>> >>> Any hints as to where Flink might incur disk I/O are greatly >>> appreciated! >>> >>> I'm also happy with doing the digging myself, once pointed to the >>> proper >>> >>> packages in the Apache Flink source tree (I have done my fair share >>> of >>> >>> inspection already, but could not be sure whether or not I have >>> missed >>> >>> something). Thanks a lot in advance! >>> >>> >>> >>> Robert >>> >>> >>> >>> -- >>> >>> My GPG Key ID: 336E2680 >>> >> >>> >> >>> > >>> > >>> > >>> > -- >>> > My GPG Key ID: 336E2680 >>> >> >> >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680