Running with these settings:
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)​

Looks like its running a little faster than the original settings, sort is
not causing OOM at least.


​What do you mean by no direct memory buffer?  The taskmanagers look to
report correct capacity under the Outside JVM section.​

Was googling around and ran into this:
https://github.com/netty/netty/issues/6813  seemed promising but I dont see
-XX:+DisableExplicitGC being added anywhere in the yarn launch_container.sh

​


On Thu, Dec 7, 2017 at 12:39 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Ah, no direct memory buffer...
> Can you try to disable off-heap memory?
>
> 2017-12-07 18:35 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Stacktrace generates every time with the following settings (tried
>> different memory fractions):
>> yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
>> akka.ask.timeout: 60s
>> containerized.heap-cutoff-ratio: 0.15
>> taskmanager.memory.fraction: 0.7/0.3/0.1
>> taskmanager.memory.off-heap: true
>> taskmanager.memory.preallocate: true
>> env.getConfig().setExecutionMode(ExecutionMode.BATCH)
>>
>> Hand Jammed top of the stack:
>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger Reading Thread' terminated due to an exception:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getInterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1095)
>> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread' terminated due to an exception:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: 
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.io.network.netty.PartitionRequestCl
>> ientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
>> ... lots of netty stuffs
>>
>>
>> While I observe the taskmanagers I never see their JVM heaps get high at
>> all.  Mind you I cant tell which task will blow and then see its TM in time
>> to see what it looks like.  But each one I do look at the heap usage is
>> ~150MB/6.16GB (with fraction: 0.1)
>>
>> On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
>>> The managed memory should be divided among all possible consumers. In
>>> case of your simple job, this should just be Sorter.
>>> In fact, I'd try to reduce the fraction to give more memory to the JVM
>>> heap (OOM means there was not enough (heap) memory).
>>>
>>> Enabling BATCH mode means that the records are not shipped to the sorter
>>> in a pipelined fashion but buffered at (and written to the disk of) the
>>> sender task.
>>> Once the input was consumed, the data is shipped to the receiver tasks
>>> (the sorter). This mode decouples tasks and also reduces the number of
>>> network buffers because fewer connection must be active at the same time.+
>>> Here's a link to an internal design document (not sure how up to date it
>>> is though...) [1].
>>>
>>> Did you try to check if the problem is cause by data skew?
>>> You could add a MapPartition tasks instead of the PartitionSorter to
>>> count the number of records per partition.
>>>
>>> Best, Fabian
>>>
>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Data+excha
>>> nge+between+tasks
>>>
>>> 2017-12-07 16:30 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> Thanks for the reply again,
>>>>
>>>>  I'm currently doing runs with:
>>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>>> akka.ask.timeout: 60s
>>>> containerized.heap-cutoff-ratio: 0.15
>>>> taskmanager.memory.fraction: 0.7
>>>> taskmanager.memory.off-heap: true
>>>> taskmanager.memory.preallocate: true
>>>>
>>>> When I change the config setExecutionMode() to BATCH, no matter what
>>>> memory fraction I choose the sort instantly fails with SortMerger OOM
>>>> exceptions.  Even when I set fraction to 0.95.  The data source part is
>>>> ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping
>>>> the other changes looks like to do the same behavior as before, jobs been
>>>> running for ~20 minutes now.  Does Batch mode disable spilling to disk, or
>>>> does batch with a combo of off heap disable spilling to disk?  Is there
>>>> more documentation on what Batch mode does under the covers?
>>>>
>>>> As for the flow itself, yes it used to be a lot smaller, I broke it out
>>>> manually by adding the sort/partition to see which steps were causing me
>>>> the slowdown, thinking it was my code, I wanted to separate the operations.
>>>>
>>>> Thank you again for your help.
>>>>
>>>> On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> That doesn't look like a bad configuration.
>>>>>
>>>>> I have to correct myself regarding the size of the managed memory. The
>>>>> fraction (70%) is applied on the free memory after the TM initialization.
>>>>> This means that memory for network buffers (and other data structures) are
>>>>> subtracted before the managed memory is allocated.
>>>>> The actual size of the managed memory is logged in the TM log file
>>>>> during start up.
>>>>>
>>>>> You could also try to decrease the number of slots per TM to 1 but add
>>>>> more vCores (yarn.containers.vcores []) because the sorter runs in
>>>>> multiple threads.
>>>>>
>>>>> Adding a GroupCombineFunction for pre-aggregation (if possible...)
>>>>> would help to mitigate the effects of the data skew.
>>>>> Another thing I'd like to ask: Are you adding the partitioner and
>>>>> sorter explicitly to the plan and if so why? Usually, the partitioning and
>>>>> sorting is done as part of the GroupReduce.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>> setup/config.html#yarn
>>>>>
>>>>> 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>>
>>>>>> Wow thank you for the reply, you gave me a lot to look into and mess
>>>>>> with. I'll start testing with the various memory options and env settings
>>>>>> tomorrow.
>>>>>>
>>>>>> BTW the current flink cluster is launched like:
>>>>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>>>>>
>>>>>> with flink-conf.yaml property overrides of:
>>>>>> # so bigger clusters don't fail to init
>>>>>> akka.ask.timeout: 60s
>>>>>> # so more memory is given to the JVM from the yarn container
>>>>>> containerized.heap-cutoff-ratio: 0.15
>>>>>>
>>>>>> So each flink slot doesn't necessarily get a lot of ram, you said 70%
>>>>>> of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  
>>>>>> So
>>>>>> each slot is sitting with ~2737MB of usable space.  Would you have a
>>>>>> different config for taking overall the same amount of ram?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Garrett,
>>>>>>>
>>>>>>> data skew might be a reason for the performance degradation.
>>>>>>>
>>>>>>> The plan you shared is pretty simple. The following happens you run
>>>>>>> the program:
>>>>>>> - The data source starts to read data and pushes the records to the
>>>>>>> FlatMapFunction. From there the records are shuffed (using
>>>>>>> hash-partitioning) to the sorter.
>>>>>>> - The sorter tasks consume the records and write them into a memory
>>>>>>> buffer. When the buffer is full, it is sorted and spilled to disk. When 
>>>>>>> the
>>>>>>> buffer was spilled, it is filled again with records, sorted, and 
>>>>>>> spilled.
>>>>>>> - The initially fast processing happens because at the beginning the
>>>>>>> sorter is not waiting for buffers to be sorted or spilled because they 
>>>>>>> are
>>>>>>> empty.
>>>>>>>
>>>>>>> The performance of the plan depends (among other things) on the size
>>>>>>> of the sort buffers. The sort buffers are taken from Flink's managed
>>>>>>> memory.
>>>>>>> Unless you configured something else, 70% of to the TaskManager heap
>>>>>>> memory is reserved as managed memory.
>>>>>>> If you use Flink only for batch jobs, I would enable preallocation
>>>>>>> and off-heap memory (see configuration options [1]). You can also 
>>>>>>> configure
>>>>>>> a fixed size for the managed memory. The more memory you configure, the
>>>>>>> more is available for sorting.
>>>>>>>
>>>>>>> The managed memory of a TM is evenly distributed to all its
>>>>>>> processing slots. Hence, having more slots per TM means that each slot 
>>>>>>> has
>>>>>>> fewer managed memory (for sorting or joins or ...).
>>>>>>> So many slots are not necessarily good for performance (unless you
>>>>>>> increase the number of TMs / memory as well), especially in case of data
>>>>>>> skew when most slots receive only little data and cannot leverage their
>>>>>>> memory.
>>>>>>> If your data is heavily skewed, it might make sense to have fewer
>>>>>>> slots such that each slot has more memory for sorting.
>>>>>>>
>>>>>>> Skew has also an effect on downstream operations. In case of skew,
>>>>>>> some of the sorter tasks are overloaded and cannot accept more data.
>>>>>>> Due to the pipelined shuffles, this leads to a back pressure
>>>>>>> behavior that propagates down to the sources.
>>>>>>> You can disable pipelining by setting the execution mode on the
>>>>>>> execution configuration to BATCH [2]. This will break the pipeline but
>>>>>>> write the result of the FlatMap to disk.
>>>>>>> This might help, if the FlatMap is compute intensive or filters many
>>>>>>> records.
>>>>>>>
>>>>>>> The data sizes don't sound particular large, so this should be
>>>>>>> something that Flink should be able to handle.
>>>>>>>
>>>>>>> Btw. you don't need to convert the JSON plan output. You can paste
>>>>>>> it into the plan visualizer [3].
>>>>>>> I would not worry about the missing statistics. The optimizer does
>>>>>>> not leverage them at the current state.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> setup/config.html#managed-memory
>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> dev/execution_configuration.html
>>>>>>> [3] http://flink.apache.org/visualizer/
>>>>>>>
>>>>>>> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>
>>>>>>> :
>>>>>>>
>>>>>>>> Fabian,
>>>>>>>>
>>>>>>>>  Thank you for the reply.  Yes I do watch via the ui, is there
>>>>>>>> another way to see progress through the steps?
>>>>>>>>
>>>>>>>> I think I just figured it out, the hangup is in the sort phase (ID
>>>>>>>> 4) where 2 slots take all the time.  Looking in the UI most slots get 
>>>>>>>> less
>>>>>>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, 
>>>>>>>> together
>>>>>>>> its about 272M records and these will run for hours at this point.  
>>>>>>>> Looks
>>>>>>>> like I need to figure out a different partitioning/sort strategy. I 
>>>>>>>> never
>>>>>>>> noticed before because when I run the system at ~1400 slots I don't 
>>>>>>>> use the
>>>>>>>> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
>>>>>>>> still works.
>>>>>>>>
>>>>>>>>
>>>>>>>> The getEnv output is very cool! Also very big, I've tried to
>>>>>>>> summarize it here in more of a yaml format as its on a different 
>>>>>>>> network.
>>>>>>>> Note the parallelism was just set to 10 as I didn't know if that 
>>>>>>>> effected
>>>>>>>> output.  Hopefully I didn't flub a copy paste step, it looks good to 
>>>>>>>> me.
>>>>>>>>
>>>>>>>>
>>>>>>>> ​This flow used to be far fewer steps, but as it wasn't scaling I
>>>>>>>> broke it out into all the distinct pieces so I could see where it 
>>>>>>>> failed.​
>>>>>>>> Source and sink are both Hive tables.  I wonder if the inputformat is
>>>>>>>> expected to give more info to seed some of these stat values?
>>>>>>>>
>>>>>>>> ​nodes
>>>>>>>>     id: 6
>>>>>>>>     type: source
>>>>>>>>     pact: Data Source
>>>>>>>>     contents: at CreateInput(ExecutionEnvironment.java:533)
>>>>>>>>     parallelism: 10
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: 0
>>>>>>>>         name: Cumulative Disk I/O value: 0
>>>>>>>>         name: Cumulative CPU value: 0
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>     id: 5
>>>>>>>>     type: pact
>>>>>>>>     pact: FlatMap
>>>>>>>>     contents: FlatMap at main()
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: FlatMap
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: 0
>>>>>>>>         name: Cumulative Disk I/O value: 0
>>>>>>>>         name: Cumulative CPU value: 0
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>     id: 4
>>>>>>>>     type: pact
>>>>>>>>     pact: Sort-Partition
>>>>>>>>     contents: Sort at main()
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 5, ship_strategy: Hash Partition on [0,2]
>>>>>>>> local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: No-Op
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: HASH_PARTITIONED
>>>>>>>>         name: Partitioned on value: [0,2]
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: [0:ASC,2:ASC,1:ASC]
>>>>>>>>         name: Grouping value: [0,2,1]
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>     id: 3
>>>>>>>>     type: pact
>>>>>>>>     pact: GroupReduce
>>>>>>>>     contents: GroupReduce at first(SortedGrouping.java:210)
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: Sorted Group Reduce
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>
>>>>>>>>     id: 2
>>>>>>>>     type: pact
>>>>>>>>     pact: Map
>>>>>>>>     contents: Map at ()
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: Map
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>     id: 1
>>>>>>>>     type: pact
>>>>>>>>     pact: Map
>>>>>>>>     contents: map at main()
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: Map
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none
>>>>>>>>
>>>>>>>>     id: 0
>>>>>>>>     type: sink
>>>>>>>>     pact: Data Sink
>>>>>>>>     contents: org.apache.flink.api.java.jado
>>>>>>>> op.mapreduce.HadoopOutputFormat
>>>>>>>>     parallelism: 10
>>>>>>>>     predecessors:
>>>>>>>>         id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>>     driver_strategy: Map
>>>>>>>>     global_properties:
>>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>>         name: Partitioning Order value: none
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     local_properties:
>>>>>>>>         name: Order value: none
>>>>>>>>         name: Grouping value: not grouped
>>>>>>>>         name: Uniqueness value: not unique
>>>>>>>>     estimates:
>>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>>     costs:
>>>>>>>>         name: Network value: 0
>>>>>>>>         name: Disk I/O value 0
>>>>>>>>         name: CPU value: 0
>>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>>     compiler_hints:
>>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>>         name: Output Cardinality value: none
>>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>>         name: Filter Factor value: none​
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Flink's operators are designed to work in memory as long as
>>>>>>>>> possible and spill to disk once the memory budget is exceeded.
>>>>>>>>> Moreover, Flink aims to run programs in a pipelined fashion, such
>>>>>>>>> that multiple operators can process data at the same time.
>>>>>>>>> This behavior can make it a bit tricky to analyze the runtime
>>>>>>>>> behavior and progress of operators.
>>>>>>>>>
>>>>>>>>> It would be interesting to have a look at the execution plan for
>>>>>>>>> the program that you are running.
>>>>>>>>> The plan can be obtained from the ExecutionEnvironment by calling
>>>>>>>>> env.getExecutionPlan() instead of env.execute().
>>>>>>>>>
>>>>>>>>> I would also like to know how you track the progress of the
>>>>>>>>> program.
>>>>>>>>> Are you looking at the record counts displayed in the WebUI?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Fabian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <
>>>>>>>>> garrett.bar...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I have been moving some old MR and hive workflows into Flink
>>>>>>>>>> because I'm enjoying the api's and the ease of development is 
>>>>>>>>>> wonderful.
>>>>>>>>>> Things have largely worked great until I tried to really scale some 
>>>>>>>>>> of the
>>>>>>>>>> jobs recently.
>>>>>>>>>>
>>>>>>>>>> I have for example one etl job that reads in about 12B records at
>>>>>>>>>> a time and does a sort, some simple transformations, validation, a
>>>>>>>>>> re-partition and then output to a hive table.
>>>>>>>>>> When I built it with the sample set, ~200M, it worked great, took
>>>>>>>>>> maybe a minute and blew threw it.
>>>>>>>>>>
>>>>>>>>>> What I have observed is there is some kind of saturation reached
>>>>>>>>>> depending on number of slots, number of nodes and the overall size 
>>>>>>>>>> of data
>>>>>>>>>> to move.  When I run the 12B set, the first 1B go through in under 1
>>>>>>>>>> minute, really really fast.  But its an extremely sharp drop off 
>>>>>>>>>> after
>>>>>>>>>> that, the next 1B might take 15 minutes, and then if I wait for the 
>>>>>>>>>> next
>>>>>>>>>> 1B, its well over an hour.
>>>>>>>>>>
>>>>>>>>>> What I cant find is any obvious indicators or things to look at,
>>>>>>>>>> everything just grinds to a halt, I don't think the job would ever 
>>>>>>>>>> actually
>>>>>>>>>> complete.
>>>>>>>>>>
>>>>>>>>>> Is there something in the design of flink in batch mode that is
>>>>>>>>>> perhaps memory bound?  Adding more nodes/tasks does not fix it, just 
>>>>>>>>>> gets
>>>>>>>>>> me a little further along.  I'm already running around ~1,400 slots 
>>>>>>>>>> at this
>>>>>>>>>> point, I'd postulate needing 10,000+ to potentially make the job 
>>>>>>>>>> run, but
>>>>>>>>>> thats too much of my cluster gone, and I have yet to get flink to be 
>>>>>>>>>> stable
>>>>>>>>>> past 1,500.
>>>>>>>>>>
>>>>>>>>>> Any idea's on where to look, or what to debug?  GUI is also very
>>>>>>>>>> cumbersome to use at this slot count too, so other measurement ideas 
>>>>>>>>>> are
>>>>>>>>>> welcome too!
>>>>>>>>>>
>>>>>>>>>> Thank you all.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to