Ah, no direct memory buffer...
Can you try to disable off-heap memory?

2017-12-07 18:35 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:

> Stacktrace generates every time with the following settings (tried
> different memory fractions):
> yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
> akka.ask.timeout: 60s
> containerized.heap-cutoff-ratio: 0.15
> taskmanager.memory.fraction: 0.7/0.3/0.1
> taskmanager.memory.off-heap: true
> taskmanager.memory.preallocate: true
> env.getConfig().setExecutionMode(ExecutionMode.BATCH)
>
> Hand Jammed top of the stack:
> java.lang.RuntimeException: Error obtaining the sorted input: Thread
> 'SortMerger Reading Thread' terminated due to an exception:
> java.lang.OutOfMemoryError: Direct buffer memory
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getInterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1095)
> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread' terminated due to an exception:
> java.lang.OutOfMemoryError: Direct buffer memory
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClientHandler.exceptionCaught(
> PartitionRequestClientHandler.java:149)
> ... lots of netty stuffs
>
>
> While I observe the taskmanagers I never see their JVM heaps get high at
> all.  Mind you I cant tell which task will blow and then see its TM in time
> to see what it looks like.  But each one I do look at the heap usage is
> ~150MB/6.16GB (with fraction: 0.1)
>
> On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
>> The managed memory should be divided among all possible consumers. In
>> case of your simple job, this should just be Sorter.
>> In fact, I'd try to reduce the fraction to give more memory to the JVM
>> heap (OOM means there was not enough (heap) memory).
>>
>> Enabling BATCH mode means that the records are not shipped to the sorter
>> in a pipelined fashion but buffered at (and written to the disk of) the
>> sender task.
>> Once the input was consumed, the data is shipped to the receiver tasks
>> (the sorter). This mode decouples tasks and also reduces the number of
>> network buffers because fewer connection must be active at the same time.+
>> Here's a link to an internal design document (not sure how up to date it
>> is though...) [1].
>>
>> Did you try to check if the problem is cause by data skew?
>> You could add a MapPartition tasks instead of the PartitionSorter to
>> count the number of records per partition.
>>
>> Best, Fabian
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/Data+excha
>> nge+between+tasks
>>
>> 2017-12-07 16:30 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>
>>> Thanks for the reply again,
>>>
>>>  I'm currently doing runs with:
>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>> akka.ask.timeout: 60s
>>> containerized.heap-cutoff-ratio: 0.15
>>> taskmanager.memory.fraction: 0.7
>>> taskmanager.memory.off-heap: true
>>> taskmanager.memory.preallocate: true
>>>
>>> When I change the config setExecutionMode() to BATCH, no matter what
>>> memory fraction I choose the sort instantly fails with SortMerger OOM
>>> exceptions.  Even when I set fraction to 0.95.  The data source part is
>>> ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping
>>> the other changes looks like to do the same behavior as before, jobs been
>>> running for ~20 minutes now.  Does Batch mode disable spilling to disk, or
>>> does batch with a combo of off heap disable spilling to disk?  Is there
>>> more documentation on what Batch mode does under the covers?
>>>
>>> As for the flow itself, yes it used to be a lot smaller, I broke it out
>>> manually by adding the sort/partition to see which steps were causing me
>>> the slowdown, thinking it was my code, I wanted to separate the operations.
>>>
>>> Thank you again for your help.
>>>
>>> On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> That doesn't look like a bad configuration.
>>>>
>>>> I have to correct myself regarding the size of the managed memory. The
>>>> fraction (70%) is applied on the free memory after the TM initialization.
>>>> This means that memory for network buffers (and other data structures) are
>>>> subtracted before the managed memory is allocated.
>>>> The actual size of the managed memory is logged in the TM log file
>>>> during start up.
>>>>
>>>> You could also try to decrease the number of slots per TM to 1 but add
>>>> more vCores (yarn.containers.vcores []) because the sorter runs in
>>>> multiple threads.
>>>>
>>>> Adding a GroupCombineFunction for pre-aggregation (if possible...)
>>>> would help to mitigate the effects of the data skew.
>>>> Another thing I'd like to ask: Are you adding the partitioner and
>>>> sorter explicitly to the plan and if so why? Usually, the partitioning and
>>>> sorting is done as part of the GroupReduce.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> setup/config.html#yarn
>>>>
>>>> 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>
>>>>> Wow thank you for the reply, you gave me a lot to look into and mess
>>>>> with. I'll start testing with the various memory options and env settings
>>>>> tomorrow.
>>>>>
>>>>> BTW the current flink cluster is launched like:
>>>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>>>>
>>>>> with flink-conf.yaml property overrides of:
>>>>> # so bigger clusters don't fail to init
>>>>> akka.ask.timeout: 60s
>>>>> # so more memory is given to the JVM from the yarn container
>>>>> containerized.heap-cutoff-ratio: 0.15
>>>>>
>>>>> So each flink slot doesn't necessarily get a lot of ram, you said 70%
>>>>> of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  
>>>>> So
>>>>> each slot is sitting with ~2737MB of usable space.  Would you have a
>>>>> different config for taking overall the same amount of ram?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Garrett,
>>>>>>
>>>>>> data skew might be a reason for the performance degradation.
>>>>>>
>>>>>> The plan you shared is pretty simple. The following happens you run
>>>>>> the program:
>>>>>> - The data source starts to read data and pushes the records to the
>>>>>> FlatMapFunction. From there the records are shuffed (using
>>>>>> hash-partitioning) to the sorter.
>>>>>> - The sorter tasks consume the records and write them into a memory
>>>>>> buffer. When the buffer is full, it is sorted and spilled to disk. When 
>>>>>> the
>>>>>> buffer was spilled, it is filled again with records, sorted, and spilled.
>>>>>> - The initially fast processing happens because at the beginning the
>>>>>> sorter is not waiting for buffers to be sorted or spilled because they 
>>>>>> are
>>>>>> empty.
>>>>>>
>>>>>> The performance of the plan depends (among other things) on the size
>>>>>> of the sort buffers. The sort buffers are taken from Flink's managed
>>>>>> memory.
>>>>>> Unless you configured something else, 70% of to the TaskManager heap
>>>>>> memory is reserved as managed memory.
>>>>>> If you use Flink only for batch jobs, I would enable preallocation
>>>>>> and off-heap memory (see configuration options [1]). You can also 
>>>>>> configure
>>>>>> a fixed size for the managed memory. The more memory you configure, the
>>>>>> more is available for sorting.
>>>>>>
>>>>>> The managed memory of a TM is evenly distributed to all its
>>>>>> processing slots. Hence, having more slots per TM means that each slot 
>>>>>> has
>>>>>> fewer managed memory (for sorting or joins or ...).
>>>>>> So many slots are not necessarily good for performance (unless you
>>>>>> increase the number of TMs / memory as well), especially in case of data
>>>>>> skew when most slots receive only little data and cannot leverage their
>>>>>> memory.
>>>>>> If your data is heavily skewed, it might make sense to have fewer
>>>>>> slots such that each slot has more memory for sorting.
>>>>>>
>>>>>> Skew has also an effect on downstream operations. In case of skew,
>>>>>> some of the sorter tasks are overloaded and cannot accept more data.
>>>>>> Due to the pipelined shuffles, this leads to a back pressure behavior
>>>>>> that propagates down to the sources.
>>>>>> You can disable pipelining by setting the execution mode on the
>>>>>> execution configuration to BATCH [2]. This will break the pipeline but
>>>>>> write the result of the FlatMap to disk.
>>>>>> This might help, if the FlatMap is compute intensive or filters many
>>>>>> records.
>>>>>>
>>>>>> The data sizes don't sound particular large, so this should be
>>>>>> something that Flink should be able to handle.
>>>>>>
>>>>>> Btw. you don't need to convert the JSON plan output. You can paste it
>>>>>> into the plan visualizer [3].
>>>>>> I would not worry about the missing statistics. The optimizer does
>>>>>> not leverage them at the current state.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> setup/config.html#managed-memory
>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> dev/execution_configuration.html
>>>>>> [3] http://flink.apache.org/visualizer/
>>>>>>
>>>>>> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>>>
>>>>>>> Fabian,
>>>>>>>
>>>>>>>  Thank you for the reply.  Yes I do watch via the ui, is there
>>>>>>> another way to see progress through the steps?
>>>>>>>
>>>>>>> I think I just figured it out, the hangup is in the sort phase (ID
>>>>>>> 4) where 2 slots take all the time.  Looking in the UI most slots get 
>>>>>>> less
>>>>>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, 
>>>>>>> together
>>>>>>> its about 272M records and these will run for hours at this point.  
>>>>>>> Looks
>>>>>>> like I need to figure out a different partitioning/sort strategy. I 
>>>>>>> never
>>>>>>> noticed before because when I run the system at ~1400 slots I don't use 
>>>>>>> the
>>>>>>> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
>>>>>>> still works.
>>>>>>>
>>>>>>>
>>>>>>> The getEnv output is very cool! Also very big, I've tried to
>>>>>>> summarize it here in more of a yaml format as its on a different 
>>>>>>> network.
>>>>>>> Note the parallelism was just set to 10 as I didn't know if that 
>>>>>>> effected
>>>>>>> output.  Hopefully I didn't flub a copy paste step, it looks good to me.
>>>>>>>
>>>>>>>
>>>>>>> ​This flow used to be far fewer steps, but as it wasn't scaling I
>>>>>>> broke it out into all the distinct pieces so I could see where it 
>>>>>>> failed.​
>>>>>>> Source and sink are both Hive tables.  I wonder if the inputformat is
>>>>>>> expected to give more info to seed some of these stat values?
>>>>>>>
>>>>>>> ​nodes
>>>>>>>     id: 6
>>>>>>>     type: source
>>>>>>>     pact: Data Source
>>>>>>>     contents: at CreateInput(ExecutionEnvironment.java:533)
>>>>>>>     parallelism: 10
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: 0
>>>>>>>         name: Cumulative Disk I/O value: 0
>>>>>>>         name: Cumulative CPU value: 0
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>     id: 5
>>>>>>>     type: pact
>>>>>>>     pact: FlatMap
>>>>>>>     contents: FlatMap at main()
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>     driver_strategy: FlatMap
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: 0
>>>>>>>         name: Cumulative Disk I/O value: 0
>>>>>>>         name: Cumulative CPU value: 0
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>     id: 4
>>>>>>>     type: pact
>>>>>>>     pact: Sort-Partition
>>>>>>>     contents: Sort at main()
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 5, ship_strategy: Hash Partition on [0,2]
>>>>>>> local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
>>>>>>>     driver_strategy: No-Op
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: HASH_PARTITIONED
>>>>>>>         name: Partitioned on value: [0,2]
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: [0:ASC,2:ASC,1:ASC]
>>>>>>>         name: Grouping value: [0,2,1]
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>     id: 3
>>>>>>>     type: pact
>>>>>>>     pact: GroupReduce
>>>>>>>     contents: GroupReduce at first(SortedGrouping.java:210)
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>     driver_strategy: Sorted Group Reduce
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>
>>>>>>>     id: 2
>>>>>>>     type: pact
>>>>>>>     pact: Map
>>>>>>>     contents: Map at ()
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>     driver_strategy: Map
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>     id: 1
>>>>>>>     type: pact
>>>>>>>     pact: Map
>>>>>>>     contents: map at main()
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>     driver_strategy: Map
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none
>>>>>>>
>>>>>>>     id: 0
>>>>>>>     type: sink
>>>>>>>     pact: Data Sink
>>>>>>>     contents: org.apache.flink.api.java.jado
>>>>>>> op.mapreduce.HadoopOutputFormat
>>>>>>>     parallelism: 10
>>>>>>>     predecessors:
>>>>>>>         id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>>>>     driver_strategy: Map
>>>>>>>     global_properties:
>>>>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>>>>         name: Partitioning Order value: none
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     local_properties:
>>>>>>>         name: Order value: none
>>>>>>>         name: Grouping value: not grouped
>>>>>>>         name: Uniqueness value: not unique
>>>>>>>     estimates:
>>>>>>>         name: Est. Output Size value: unknown
>>>>>>>         name: Est Cardinality value: unknown
>>>>>>>     costs:
>>>>>>>         name: Network value: 0
>>>>>>>         name: Disk I/O value 0
>>>>>>>         name: CPU value: 0
>>>>>>>         name: Cumulative Network value: unknown
>>>>>>>         name: Cumulative Disk I/O value: unknown
>>>>>>>         name: Cumulative CPU value: unknown
>>>>>>>     compiler_hints:
>>>>>>>         name: Output Size (bytes) value: none
>>>>>>>         name: Output Cardinality value: none
>>>>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>>>>         name: Filter Factor value: none​
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Flink's operators are designed to work in memory as long as
>>>>>>>> possible and spill to disk once the memory budget is exceeded.
>>>>>>>> Moreover, Flink aims to run programs in a pipelined fashion, such
>>>>>>>> that multiple operators can process data at the same time.
>>>>>>>> This behavior can make it a bit tricky to analyze the runtime
>>>>>>>> behavior and progress of operators.
>>>>>>>>
>>>>>>>> It would be interesting to have a look at the execution plan for
>>>>>>>> the program that you are running.
>>>>>>>> The plan can be obtained from the ExecutionEnvironment by calling
>>>>>>>> env.getExecutionPlan() instead of env.execute().
>>>>>>>>
>>>>>>>> I would also like to know how you track the progress of the
>>>>>>>> program.
>>>>>>>> Are you looking at the record counts displayed in the WebUI?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> I have been moving some old MR and hive workflows into Flink
>>>>>>>>> because I'm enjoying the api's and the ease of development is 
>>>>>>>>> wonderful.
>>>>>>>>> Things have largely worked great until I tried to really scale some 
>>>>>>>>> of the
>>>>>>>>> jobs recently.
>>>>>>>>>
>>>>>>>>> I have for example one etl job that reads in about 12B records at
>>>>>>>>> a time and does a sort, some simple transformations, validation, a
>>>>>>>>> re-partition and then output to a hive table.
>>>>>>>>> When I built it with the sample set, ~200M, it worked great, took
>>>>>>>>> maybe a minute and blew threw it.
>>>>>>>>>
>>>>>>>>> What I have observed is there is some kind of saturation reached
>>>>>>>>> depending on number of slots, number of nodes and the overall size of 
>>>>>>>>> data
>>>>>>>>> to move.  When I run the 12B set, the first 1B go through in under 1
>>>>>>>>> minute, really really fast.  But its an extremely sharp drop off after
>>>>>>>>> that, the next 1B might take 15 minutes, and then if I wait for the 
>>>>>>>>> next
>>>>>>>>> 1B, its well over an hour.
>>>>>>>>>
>>>>>>>>> What I cant find is any obvious indicators or things to look at,
>>>>>>>>> everything just grinds to a halt, I don't think the job would ever 
>>>>>>>>> actually
>>>>>>>>> complete.
>>>>>>>>>
>>>>>>>>> Is there something in the design of flink in batch mode that is
>>>>>>>>> perhaps memory bound?  Adding more nodes/tasks does not fix it, just 
>>>>>>>>> gets
>>>>>>>>> me a little further along.  I'm already running around ~1,400 slots 
>>>>>>>>> at this
>>>>>>>>> point, I'd postulate needing 10,000+ to potentially make the job run, 
>>>>>>>>> but
>>>>>>>>> thats too much of my cluster gone, and I have yet to get flink to be 
>>>>>>>>> stable
>>>>>>>>> past 1,500.
>>>>>>>>>
>>>>>>>>> Any idea's on where to look, or what to debug?  GUI is also very
>>>>>>>>> cumbersome to use at this slot count too, so other measurement ideas 
>>>>>>>>> are
>>>>>>>>> welcome too!
>>>>>>>>>
>>>>>>>>> Thank you all.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to