Fabian,

 Thank you for the reply.  Yes I do watch via the ui, is there another way
to see progress through the steps?

I think I just figured it out, the hangup is in the sort phase (ID 4) where
2 slots take all the time.  Looking in the UI most slots get less than
500MB of data to sort, these two have 6.7GB and 7.3GB each, together its
about 272M records and these will run for hours at this point.  Looks like
I need to figure out a different partitioning/sort strategy. I never
noticed before because when I run the system at ~1400 slots I don't use the
UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
still works.


The getEnv output is very cool! Also very big, I've tried to summarize it
here in more of a yaml format as its on a different network.  Note the
parallelism was just set to 10 as I didn't know if that effected output.
Hopefully I didn't flub a copy paste step, it looks good to me.


​This flow used to be far fewer steps, but as it wasn't scaling I broke it
out into all the distinct pieces so I could see where it failed.​  Source
and sink are both Hive tables.  I wonder if the inputformat is expected to
give more info to seed some of these stat values?

​nodes
    id: 6
    type: source
    pact: Data Source
    contents: at CreateInput(ExecutionEnvironment.java:533)
    parallelism: 10
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 5
    type: pact
    pact: FlatMap
    contents: FlatMap at main()
    parallelism: 10
    predecessors:
        id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: FlatMap
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: 0
        name: Cumulative Disk I/O value: 0
        name: Cumulative CPU value: 0
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 4
    type: pact
    pact: Sort-Partition
    contents: Sort at main()
    parallelism: 10
    predecessors:
        id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort
on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
    driver_strategy: No-Op
    global_properties:
        name: partitioning v: HASH_PARTITIONED
        name: Partitioned on value: [0,2]
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: [0:ASC,2:ASC,1:ASC]
        name: Grouping value: [0,2,1]
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 3
    type: pact
    pact: GroupReduce
    contents: GroupReduce at first(SortedGrouping.java:210)
    parallelism: 10
    predecessors:
        id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Sorted Group Reduce
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none


    id: 2
    type: pact
    pact: Map
    contents: Map at ()
    parallelism: 10
    predecessors:
        id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 1
    type: pact
    pact: Map
    contents: map at main()
    parallelism: 10
    predecessors:
        id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none

    id: 0
    type: sink
    pact: Data Sink
    contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
    parallelism: 10
    predecessors:
        id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
    driver_strategy: Map
    global_properties:
        name: partitioning v: RANDOM_PARTITIONED
        name: Partitioning Order value: none
        name: Uniqueness value: not unique
    local_properties:
        name: Order value: none
        name: Grouping value: not grouped
        name: Uniqueness value: not unique
    estimates:
        name: Est. Output Size value: unknown
        name: Est Cardinality value: unknown
    costs:
        name: Network value: 0
        name: Disk I/O value 0
        name: CPU value: 0
        name: Cumulative Network value: unknown
        name: Cumulative Disk I/O value: unknown
        name: Cumulative CPU value: unknown
    compiler_hints:
        name: Output Size (bytes) value: none
        name: Output Cardinality value: none
        name: Avg. Output Record Size (bytes) value: none
        name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Flink's operators are designed to work in memory as long as possible and
> spill to disk once the memory budget is exceeded.
> Moreover, Flink aims to run programs in a pipelined fashion, such that
> multiple operators can process data at the same time.
> This behavior can make it a bit tricky to analyze the runtime behavior and
> progress of operators.
>
> It would be interesting to have a look at the execution plan for the
> program that you are running.
> The plan can be obtained from the ExecutionEnvironment by calling
> env.getExecutionPlan() instead of env.execute().
>
> I would also like to know how you track the progress of the program.
> Are you looking at the record counts displayed in the WebUI?
>
> Best,
> Fabian
>
>
>
> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> I have been moving some old MR and hive workflows into Flink because I'm
>> enjoying the api's and the ease of development is wonderful.  Things have
>> largely worked great until I tried to really scale some of the jobs
>> recently.
>>
>> I have for example one etl job that reads in about 12B records at a time
>> and does a sort, some simple transformations, validation, a re-partition
>> and then output to a hive table.
>> When I built it with the sample set, ~200M, it worked great, took maybe a
>> minute and blew threw it.
>>
>> What I have observed is there is some kind of saturation reached
>> depending on number of slots, number of nodes and the overall size of data
>> to move.  When I run the 12B set, the first 1B go through in under 1
>> minute, really really fast.  But its an extremely sharp drop off after
>> that, the next 1B might take 15 minutes, and then if I wait for the next
>> 1B, its well over an hour.
>>
>> What I cant find is any obvious indicators or things to look at,
>> everything just grinds to a halt, I don't think the job would ever actually
>> complete.
>>
>> Is there something in the design of flink in batch mode that is perhaps
>> memory bound?  Adding more nodes/tasks does not fix it, just gets me a
>> little further along.  I'm already running around ~1,400 slots at this
>> point, I'd postulate needing 10,000+ to potentially make the job run, but
>> thats too much of my cluster gone, and I have yet to get flink to be stable
>> past 1,500.
>>
>> Any idea's on where to look, or what to debug?  GUI is also very
>> cumbersome to use at this slot count too, so other measurement ideas are
>> welcome too!
>>
>> Thank you all.
>>
>
>

Reply via email to