Hey Martin,

I'm still on it. I have switched to analyzing the flink-runtime tests, as I
observe similar divergence there. I'm not sure how long it'll take, but if
I find something I'll make sure to let you all know :)

Robert

On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <martineden...@gmail.com>
wrote:

> Hi Robert,
>
> Any updates on the below for the community?
>
> Thanks,
> M
>
> On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmid...@gmail.com>
> wrote:
>
>> Hi Ufuk, thanks for coming back to me on this.
>>
>> The records are 100 bytes in size, the benchmark being TeraSort, so that
>> should not be an issue. I have played around with the input size, and here
>> are my observations:
>>
>> 128 GiB input: 0 Spilling in Flink.
>> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
>> writes), and my instrumentation covers all of it.
>> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
>> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
>> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
>> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
>> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
>> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>>
>> So regardless of how well configured my system is and spilling is even
>> necessary, it seems that with larger spilling amounts, the way the data is
>> spilled changes (and I start missing larger and larger portions of I/O
>> until almost 100%).
>> Now since I have written the instrumentation myself, I cannot guarantee
>> that it is flawless and I might have missed something.
>> I'm currently looking into how the file channels are being accessed in
>> parallel by multiple threads, which I cover as well and my tests verify it,
>> but maybe there are special access patterns here.
>>
>> Robert
>>
>> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <u...@apache.org> wrote:
>>
>>> Hey Robert,
>>>
>>> for batch that should cover the relevant spilling code. If the records
>>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>>> incoming records as well. But that should be covered by the
>>> FileChannel instrumentation as well?
>>>
>>> – Ufuk
>>>
>>>
>>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>> <ro.schmid...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > I have already looked at the UnilateralSortMerger, concluding that all
>>> I/O
>>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which
>>> in
>>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>>> Are
>>> > there more interaction points between Flink and the underlying file
>>> system
>>> > that I might want to consider?
>>> >
>>> > Thanks!
>>> > Robert
>>> >
>>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <ykt...@gmail.com> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> You probably want check out UnilateralSortMerger.java, this is the
>>> class
>>> >> which is responsible for external sort for flink. Here is a short
>>> >> description for how it works: there are totally 3 threads working
>>> together,
>>> >> one for reading, one for sorting partial data in memory, and the last
>>> one is
>>> >> responsible for spilling. Flink will first figure out how many memory
>>> it can
>>> >> use during the in-memory sort, and manage them as MemorySegments.
>>> Once these
>>> >> memory runs out, the sorting thread will take over these memory and
>>> do the
>>> >> in-memory sorting (For more details about in-memory sorting, you can
>>> see
>>> >> NormalizedKeySorter). After this, the spilling thread will write this
>>> sorted
>>> >> data to disk and make these memory available again for reading. This
>>> will
>>> >> repeated until all data has been processed.
>>> >> Normally, the data will be read twice (one from source, and one from
>>> disk)
>>> >> and write once, but if you spilled too much files, flink will first
>>> merge
>>> >> some all the files and make sure the last merge step will not exceed
>>> some
>>> >> limit (default 128). Hope this can help you.
>>> >>
>>> >> Best,
>>> >> Kurt
>>> >>
>>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>>> know
>>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>>> have done
>>> >>> so far.
>>> >>>
>>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node
>>> cluster, each
>>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>>> of disk.
>>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>>> >>>
>>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>>> counters
>>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>>> 3.2 TiB
>>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>>> TeraGen, 1
>>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>> >>>
>>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>> >>> wrapper that logs file system statistics for each call to
>>> hdfs://..., such
>>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>>> these
>>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes
>>> to
>>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>>> writes
>>> >>> to hdfs://... So far, so good.
>>> >>>
>>> >>> Now this still did not explain the disk I/O, so I added bytecode
>>> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
>>> memory
>>> >>> mapped files etc., and have the same statistics: start/end of a read
>>> >>> from/write to disk, no. of bytes involved and such. I can plot these
>>> numbers
>>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>>> TeraGen
>>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>> >>> (expected).
>>> >>>
>>> >>> Sorry for the enormous introduction, but now there's finally the
>>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>>> data
>>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>>> >>> involved, potentially because I have not done the setup properly.
>>> But that
>>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>>> writes to
>>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>>> counters
>>> >>> from above. However, my statistics only give 2 TiB of reads from
>>> disk (1 TiB
>>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>>> from disk
>>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>>> not
>>> >>> missing any data, meaning my statistics agree with XFS for TeraSort
>>> on
>>> >>> Hadoop, which is why I suspect there are some cases where Flink goes
>>> to disk
>>> >>> without me noticing it.
>>> >>>
>>> >>> Therefore here finally the question: in which cases does Flink go to
>>> >>> disk, and how does it do so (meaning precisely which Java classes are
>>> >>> involved, so I can check my bytecode instrumentation)? This would
>>> also
>>> >>> include any kind of resource distribution via HDFS/YARN I guess
>>> (like JAR
>>> >>> files and I don't know what). Seeing that I'm missing an amount of
>>> data
>>> >>> equal to the size of my input set I'd suspect there must be some
>>> sort of
>>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>>> also some
>>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>>> >>>
>>> >>> Any hints as to where Flink might incur disk I/O are greatly
>>> appreciated!
>>> >>> I'm also happy with doing the digging myself, once pointed to the
>>> proper
>>> >>> packages in the Apache Flink source tree (I have done my fair share
>>> of
>>> >>> inspection already, but could not be sure whether or not I have
>>> missed
>>> >>> something). Thanks a lot in advance!
>>> >>>
>>> >>> Robert
>>> >>>
>>> >>> --
>>> >>> My GPG Key ID: 336E2680
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Reply via email to