Re: Yarn run single job

2018-07-10 Thread Garrett Barton
AHH it works!  Never occurred to me that it meant literally type in
yarn-cluster.

Thank you!

On Tue, Jul 10, 2018 at 11:17 AM Chesnay Schepler 
wrote:

> -m yarn-cluster switches the client into yarn mode.
>
> yarn-cluster is not a placeholder or anything, you have to literally type
> that in.
>
> On 10.07.2018 17:02, Garrett Barton wrote:
>
> Greetings all,
>  The docs say that I can skip creating a cluster and let the jobs create
> their own clusters on yarn.  The example given is:
>
> ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
>
>
> What I cannot figure out is what the -m option is meant for.  In my
> opinion there is no jobmanager to specify, I expect flink to start one.
> Skipping the option doesn't work as it defaults to the conf one which has a
> comment saying flink manages it for yarn deployments.
>
> I tried pointing it at my yarn resource manager, it didn't like any of the
> ports.
>
> Any ideas?
>
>
>


Yarn run single job

2018-07-10 Thread Garrett Barton
Greetings all,
 The docs say that I can skip creating a cluster and let the jobs create
their own clusters on yarn.  The example given is:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar


What I cannot figure out is what the -m option is meant for.  In my opinion
there is no jobmanager to specify, I expect flink to start one.  Skipping
the option doesn't work as it defaults to the conf one which has a comment
saying flink manages it for yarn deployments.

I tried pointing it at my yarn resource manager, it didn't like any of the
ports.

Any ideas?


Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-22 Thread Garrett Barton
I don't know why yet, but I did figure it out.  After my sample long
running map reduce test ran fine all night I tried a ton of things.  Turns
out there is a difference between env.execute() and env.collect().

My flow had reading from HDFS, decrypting, processing, and finally writing
to HDFS, at each step though I was splitting the feed and counting stats
for saving later.  I was executing with collect on the stat feeds unioned
together to bring them locally to determine the validity of my run before I
did other things.  Looks like collect() was causing the disconnections.
When I switched to writing the stats out to HDFS files and calling
env.execute() the flow works fine now.

Oh and thank you for the retry suggestion, I turned it on and watched the
job fail 3 times in a row with the same error.  So the retry stuff works
which is cool, and I'll use it from now on! (Btw, docs need updating here
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/fault_tolerance.html
since that's stuffs deprecated!)


Thank you all as always for being so responsive!

On Fri, Jun 22, 2018 at 5:26 AM Till Rohrmann  wrote:

> Hi Garrett,
>
> have you set a restart strategy for your job [1]? In order to recover from
> failures you need to specify one. Otherwise Flink will terminally fail the
> job in case of a failure.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 7:43 PM Garrett Barton 
> wrote:
>
>> Actually, random thought, could yarn preemption be causing this?  What is
>> the failure scenario should a working task manager go down in yarn that is
>> doing real work?  The docs make it sound like it should fire up another TM
>> and get back to work out of the box, but I'm not seeing that.
>>
>>
>> On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton 
>> wrote:
>>
>>> Thank you all for the reply!
>>>
>>> I am running batch jobs, I read in a handful of files from HDFS and
>>> output to HBase, HDFS, and Kafka.  I run into this when I have partial
>>> usage of the cluster as the job runs.  So right now I spin up 20 nodes with
>>> 3 slots, my job at peak uses all 60 slots, but by the end of it since my
>>> outputs are all forced parallel 1 while I work out kinks, that all
>>> typically ends up running in 1 or two task managers tops.  The other 18-19
>>> task managers die off.  Problem is as soon as any task manager dies off, my
>>> client throws the above exception and the job fails.
>>>
>>> I cannot share logs, but I was thinking about writing a dirt simple
>>> mapreduce flow based on the wordcount example.  The example would have a
>>> wide map phase that generates data, and then I'd run it through a reducer
>>> that sleeps maybe 1 second every record.  I believe that will simulate my
>>> condition very well where I go from 100% used slots to only 1-2 used slots
>>> as I hit that timeout.  I'll do that today and let you know, if it works I
>>> can share the code in here as an example.
>>>
>>> On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Garrett,
>>>>
>>>> killing of idle TaskManager should not affect the execution of the job.
>>>> By definition a TaskManager only idles if it does not execute any tasks.
>>>> Could you maybe share the complete logs (of the cluster entrypoint and all
>>>> TaskManagers) with us?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske 
>>>> wrote:
>>>>
>>>>> Hi Garrett,
>>>>>
>>>>> I agree, there seems to be an issue and increasing the timeout should
>>>>> not be the right approach to solve it.
>>>>> Are you running streaming or batch jobs, i.e., do some of the tasks
>>>>> finish much earlier than others?
>>>>>
>>>>> I'm adding Till to this thread who's very familiar with scheduling and
>>>>> process communication.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>>  My jobs that I am trying to write in Flink 1.5 are failing after a
>>>>>> few minutes.  I think its because the idle task managers are shutting 
>>>>>> down,
>>>>>> which seems to kill the client and the running job. The running job 
>

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Actually, random thought, could yarn preemption be causing this?  What is
the failure scenario should a working task manager go down in yarn that is
doing real work?  The docs make it sound like it should fire up another TM
and get back to work out of the box, but I'm not seeing that.


On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton 
wrote:

> Thank you all for the reply!
>
> I am running batch jobs, I read in a handful of files from HDFS and output
> to HBase, HDFS, and Kafka.  I run into this when I have partial usage of
> the cluster as the job runs.  So right now I spin up 20 nodes with 3 slots,
> my job at peak uses all 60 slots, but by the end of it since my outputs are
> all forced parallel 1 while I work out kinks, that all typically ends up
> running in 1 or two task managers tops.  The other 18-19 task managers die
> off.  Problem is as soon as any task manager dies off, my client throws the
> above exception and the job fails.
>
> I cannot share logs, but I was thinking about writing a dirt simple
> mapreduce flow based on the wordcount example.  The example would have a
> wide map phase that generates data, and then I'd run it through a reducer
> that sleeps maybe 1 second every record.  I believe that will simulate my
> condition very well where I go from 100% used slots to only 1-2 used slots
> as I hit that timeout.  I'll do that today and let you know, if it works I
> can share the code in here as an example.
>
> On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann 
> wrote:
>
>> Hi Garrett,
>>
>> killing of idle TaskManager should not affect the execution of the job.
>> By definition a TaskManager only idles if it does not execute any tasks.
>> Could you maybe share the complete logs (of the cluster entrypoint and all
>> TaskManagers) with us?
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske  wrote:
>>
>>> Hi Garrett,
>>>
>>> I agree, there seems to be an issue and increasing the timeout should
>>> not be the right approach to solve it.
>>> Are you running streaming or batch jobs, i.e., do some of the tasks
>>> finish much earlier than others?
>>>
>>> I'm adding Till to this thread who's very familiar with scheduling and
>>> process communication.
>>>
>>> Best, Fabian
>>>
>>> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>>>
>>>> Hey all,
>>>>
>>>>  My jobs that I am trying to write in Flink 1.5 are failing after a few
>>>> minutes.  I think its because the idle task managers are shutting down,
>>>> which seems to kill the client and the running job. The running job itself
>>>> was still going on one of the other task managers.  I get:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>> Connection unexpectedly closed by remote task manager ''. This might
>>>> indicate that the remote task manager was lost.
>>>> at org.apache.flink.runtime.io
>>>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>>>>
>>>> Now I happen to have the last part of the flow paralleled to 1 right
>>>> now for debugging, so the 4 task managers that are spun up, 3 of them hit
>>>> the timeout period (currently set to 24).  I think as soon as the first
>>>> one goes the client throws up and the whole job dies as a result.
>>>>
>>>>  Is this expected behavior and if so, is there another way around it?
>>>> Do I keep increasing the slotmanager.taskmanager-timeout to a really really
>>>> large number? I have verified setting the timeout to 84 lets the job
>>>> complete without error.
>>>>
>>>> Thank you!
>>>>
>>>
>>>


Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Thank you all for the reply!

I am running batch jobs, I read in a handful of files from HDFS and output
to HBase, HDFS, and Kafka.  I run into this when I have partial usage of
the cluster as the job runs.  So right now I spin up 20 nodes with 3 slots,
my job at peak uses all 60 slots, but by the end of it since my outputs are
all forced parallel 1 while I work out kinks, that all typically ends up
running in 1 or two task managers tops.  The other 18-19 task managers die
off.  Problem is as soon as any task manager dies off, my client throws the
above exception and the job fails.

I cannot share logs, but I was thinking about writing a dirt simple
mapreduce flow based on the wordcount example.  The example would have a
wide map phase that generates data, and then I'd run it through a reducer
that sleeps maybe 1 second every record.  I believe that will simulate my
condition very well where I go from 100% used slots to only 1-2 used slots
as I hit that timeout.  I'll do that today and let you know, if it works I
can share the code in here as an example.

On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann  wrote:

> Hi Garrett,
>
> killing of idle TaskManager should not affect the execution of the job. By
> definition a TaskManager only idles if it does not execute any tasks. Could
> you maybe share the complete logs (of the cluster entrypoint and all
> TaskManagers) with us?
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske  wrote:
>
>> Hi Garrett,
>>
>> I agree, there seems to be an issue and increasing the timeout should not
>> be the right approach to solve it.
>> Are you running streaming or batch jobs, i.e., do some of the tasks
>> finish much earlier than others?
>>
>> I'm adding Till to this thread who's very familiar with scheduling and
>> process communication.
>>
>> Best, Fabian
>>
>> 2018-06-19 0:03 GMT+02:00 Garrett Barton :
>>
>>> Hey all,
>>>
>>>  My jobs that I am trying to write in Flink 1.5 are failing after a few
>>> minutes.  I think its because the idle task managers are shutting down,
>>> which seems to kill the client and the running job. The running job itself
>>> was still going on one of the other task managers.  I get:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connection unexpectedly closed by remote task manager ''. This might
>>> indicate that the remote task manager was lost.
>>> at org.apache.flink.runtime.io
>>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>>>
>>> Now I happen to have the last part of the flow paralleled to 1 right now
>>> for debugging, so the 4 task managers that are spun up, 3 of them hit the
>>> timeout period (currently set to 24).  I think as soon as the first one
>>> goes the client throws up and the whole job dies as a result.
>>>
>>>  Is this expected behavior and if so, is there another way around it? Do
>>> I keep increasing the slotmanager.taskmanager-timeout to a really really
>>> large number? I have verified setting the timeout to 84 lets the job
>>> complete without error.
>>>
>>> Thank you!
>>>
>>
>>


Flink 1.5 Yarn Connection unexpectedly closed

2018-06-18 Thread Garrett Barton
Hey all,

 My jobs that I am trying to write in Flink 1.5 are failing after a few
minutes.  I think its because the idle task managers are shutting down,
which seems to kill the client and the running job. The running job itself
was still going on one of the other task managers.  I get:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager ''. This might
indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)

Now I happen to have the last part of the flow paralleled to 1 right now
for debugging, so the 4 task managers that are spun up, 3 of them hit the
timeout period (currently set to 24).  I think as soon as the first one
goes the client throws up and the whole job dies as a result.

 Is this expected behavior and if so, is there another way around it? Do I
keep increasing the slotmanager.taskmanager-timeout to a really really
large number? I have verified setting the timeout to 84 lets the job
complete without error.

Thank you!


Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Running with these settings:
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)​

Looks like its running a little faster than the original settings, sort is
not causing OOM at least.


​What do you mean by no direct memory buffer?  The taskmanagers look to
report correct capacity under the Outside JVM section.​

Was googling around and ran into this:
https://github.com/netty/netty/issues/6813  seemed promising but I dont see
-XX:+DisableExplicitGC being added anywhere in the yarn launch_container.sh

​


On Thu, Dec 7, 2017 at 12:39 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Ah, no direct memory buffer...
> Can you try to disable off-heap memory?
>
> 2017-12-07 18:35 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Stacktrace generates every time with the following settings (tried
>> different memory fractions):
>> yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
>> akka.ask.timeout: 60s
>> containerized.heap-cutoff-ratio: 0.15
>> taskmanager.memory.fraction: 0.7/0.3/0.1
>> taskmanager.memory.off-heap: true
>> taskmanager.memory.preallocate: true
>> env.getConfig().setExecutionMode(ExecutionMode.BATCH)
>>
>> Hand Jammed top of the stack:
>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger Reading Thread' terminated due to an exception:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getInterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1095)
>> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread' terminated due to an exception:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: 
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at org.apache.flink.runtime.io.network.netty.PartitionRequestCl
>> ientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
>> ... lots of netty stuffs
>>
>>
>> While I observe the taskmanagers I never see their JVM heaps get high at
>> all.  Mind you I cant tell which task will blow and then see its TM in time
>> to see what it looks like.  But each one I do look at the heap usage is
>> ~150MB/6.16GB (with fraction: 0.1)
>>
>> On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
>>> The managed memory should be divided among all possible consumers. In
>>> case of your simple job, this should just be Sorter.
>>> In fact, I'd try to reduce the fraction to give more memory to the JVM
>>> heap (OOM means there was not enough (heap) memory).
>>>
>>> Enabling BATCH mode means that the records are not shipped to the sorter
>>> in a pipelined fashion but buffered at (and written to the disk of) the
>>> sender task.
>>> Once the input was consumed, the data is shipped to the receiver tasks
>>> (the sorter). This mode decouples tasks and also reduces the number of
>>> network buffers because fewer connection must be active at the same time.+
>>> Here's a link to an internal design document (not sure how up to date it
>>> is though...) [1].
>>>
>>> Did you try to check if the problem is cause by data skew?
>>> You could add a MapPartition tasks instead of the PartitionSorter to
>>> count the number of records per partition.
>>>
>>> Best, Fabian
>>>
>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Data+excha
>>> nge+between+tasks
>>>
>>> 2017-12-07 16:30 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> Thanks for the reply again,
>>>>
>>>>  I'm currently doing runs with:
>>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>>> akka.ask.timeout: 60

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Stacktrace generates every time with the following settings (tried
different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger Reading Thread' terminated due to an exception:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread' terminated due to an exception:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by:
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149)
... lots of netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at
all.  Mind you I cant tell which task will blow and then see its TM in time
to see what it looks like.  But each one I do look at the heap usage is
~150MB/6.16GB (with fraction: 0.1)

On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
> The managed memory should be divided among all possible consumers. In case
> of your simple job, this should just be Sorter.
> In fact, I'd try to reduce the fraction to give more memory to the JVM
> heap (OOM means there was not enough (heap) memory).
>
> Enabling BATCH mode means that the records are not shipped to the sorter
> in a pipelined fashion but buffered at (and written to the disk of) the
> sender task.
> Once the input was consumed, the data is shipped to the receiver tasks
> (the sorter). This mode decouples tasks and also reduces the number of
> network buffers because fewer connection must be active at the same time.+
> Here's a link to an internal design document (not sure how up to date it
> is though...) [1].
>
> Did you try to check if the problem is cause by data skew?
> You could add a MapPartition tasks instead of the PartitionSorter to count
> the number of records per partition.
>
> Best, Fabian
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/Data+
> exchange+between+tasks
>
> 2017-12-07 16:30 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Thanks for the reply again,
>>
>>  I'm currently doing runs with:
>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>> akka.ask.timeout: 60s
>> containerized.heap-cutoff-ratio: 0.15
>> taskmanager.memory.fraction: 0.7
>> taskmanager.memory.off-heap: true
>> taskmanager.memory.preallocate: true
>>
>> When I change the config setExecutionMode() to BATCH, no matter what
>> memory fraction I choose the sort instantly fails with SortMerger OOM
>> exceptions.  Even when I set fraction to 0.95.  The data source part is
>> ridiculously fast though, ~30 seconds!  Disabling batch mode and keeping
>> the other changes looks like to do the same behavior as before, jobs been
>> running for ~20 minutes now.  Does Batch mode disable spilling to disk, or
>> does batch with a combo of off heap disable spilling to disk?  Is there
>> more documentation on what Batch mode does under the covers?
>>
>> As for the flow itself, yes it used to be a lot smaller, I broke it out
>> manually by adding the sort/partition to see which steps were causing me
>> the slowdown, thinking it was my code, I wanted to separate the operations.
>>
>> Thank you again for your help.
>>
>> On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> That doesn't look like a bad configuration.
>>>
>>> I have to correct myself regarding the size of the managed memory. The
>>> fraction (70%) is applied on the free memory after the TM initialization.
>>> This means that memory for network buffers (and other data structures) are
>>> subtracted before

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory
fraction I choose the sort instantly fails with SortMerger OOM exceptions.
Even when I set fraction to 0.95.  The data source part is ridiculously
fast though, ~30 seconds!  Disabling batch mode and keeping the other
changes looks like to do the same behavior as before, jobs been running for
~20 minutes now.  Does Batch mode disable spilling to disk, or does batch
with a combo of off heap disable spilling to disk?  Is there more
documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out
manually by adding the sort/partition to see which steps were causing me
the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> That doesn't look like a bad configuration.
>
> I have to correct myself regarding the size of the managed memory. The
> fraction (70%) is applied on the free memory after the TM initialization.
> This means that memory for network buffers (and other data structures) are
> subtracted before the managed memory is allocated.
> The actual size of the managed memory is logged in the TM log file during
> start up.
>
> You could also try to decrease the number of slots per TM to 1 but add
> more vCores (yarn.containers.vcores []) because the sorter runs in
> multiple threads.
>
> Adding a GroupCombineFunction for pre-aggregation (if possible...) would
> help to mitigate the effects of the data skew.
> Another thing I'd like to ask: Are you adding the partitioner and sorter
> explicitly to the plan and if so why? Usually, the partitioning and sorting
> is done as part of the GroupReduce.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#yarn
>
> 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Wow thank you for the reply, you gave me a lot to look into and mess
>> with. I'll start testing with the various memory options and env settings
>> tomorrow.
>>
>> BTW the current flink cluster is launched like:
>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>
>> with flink-conf.yaml property overrides of:
>> # so bigger clusters don't fail to init
>> akka.ask.timeout: 60s
>> # so more memory is given to the JVM from the yarn container
>> containerized.heap-cutoff-ratio: 0.15
>>
>> So each flink slot doesn't necessarily get a lot of ram, you said 70% of
>> ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So
>> each slot is sitting with ~2737MB of usable space.  Would you have a
>> different config for taking overall the same amount of ram?
>>
>>
>>
>>
>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Garrett,
>>>
>>> data skew might be a reason for the performance degradation.
>>>
>>> The plan you shared is pretty simple. The following happens you run the
>>> program:
>>> - The data source starts to read data and pushes the records to the
>>> FlatMapFunction. From there the records are shuffed (using
>>> hash-partitioning) to the sorter.
>>> - The sorter tasks consume the records and write them into a memory
>>> buffer. When the buffer is full, it is sorted and spilled to disk. When the
>>> buffer was spilled, it is filled again with records, sorted, and spilled.
>>> - The initially fast processing happens because at the beginning the
>>> sorter is not waiting for buffers to be sorted or spilled because they are
>>> empty.
>>>
>>> The performance of the plan depends (among other things) on the size of
>>> the sort buffers. The sort buffers are taken from Flink's managed memory.
>>> Unless you configured something else, 70% of to the TaskManager heap
>>> memory is reserved as managed memory.
>>> If you use Flink only for batch jobs, I would enable preallocation and
>>> off-heap memory (see configuration options [1]). You can also configure a
>>> fixed size for the managed memory. The more memory you configure, the more
>>> is available for sorting.
>>>
>>> The managed memory of a TM is evenly distributed to all its processing
>>> slots. Hence, having more slots per TM 

Re: Flink Batch Performance degradation at scale

2017-12-06 Thread Garrett Barton
Wow thank you for the reply, you gave me a lot to look into and mess with.
I'll start testing with the various memory options and env settings
tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of
ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So
each slot is sitting with ~2737MB of usable space.  Would you have a
different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Garrett,
>
> data skew might be a reason for the performance degradation.
>
> The plan you shared is pretty simple. The following happens you run the
> program:
> - The data source starts to read data and pushes the records to the
> FlatMapFunction. From there the records are shuffed (using
> hash-partitioning) to the sorter.
> - The sorter tasks consume the records and write them into a memory
> buffer. When the buffer is full, it is sorted and spilled to disk. When the
> buffer was spilled, it is filled again with records, sorted, and spilled.
> - The initially fast processing happens because at the beginning the
> sorter is not waiting for buffers to be sorted or spilled because they are
> empty.
>
> The performance of the plan depends (among other things) on the size of
> the sort buffers. The sort buffers are taken from Flink's managed memory.
> Unless you configured something else, 70% of to the TaskManager heap
> memory is reserved as managed memory.
> If you use Flink only for batch jobs, I would enable preallocation and
> off-heap memory (see configuration options [1]). You can also configure a
> fixed size for the managed memory. The more memory you configure, the more
> is available for sorting.
>
> The managed memory of a TM is evenly distributed to all its processing
> slots. Hence, having more slots per TM means that each slot has fewer
> managed memory (for sorting or joins or ...).
> So many slots are not necessarily good for performance (unless you
> increase the number of TMs / memory as well), especially in case of data
> skew when most slots receive only little data and cannot leverage their
> memory.
> If your data is heavily skewed, it might make sense to have fewer slots
> such that each slot has more memory for sorting.
>
> Skew has also an effect on downstream operations. In case of skew, some of
> the sorter tasks are overloaded and cannot accept more data.
> Due to the pipelined shuffles, this leads to a back pressure behavior that
> propagates down to the sources.
> You can disable pipelining by setting the execution mode on the execution
> configuration to BATCH [2]. This will break the pipeline but write the
> result of the FlatMap to disk.
> This might help, if the FlatMap is compute intensive or filters many
> records.
>
> The data sizes don't sound particular large, so this should be something
> that Flink should be able to handle.
>
> Btw. you don't need to convert the JSON plan output. You can paste it into
> the plan visualizer [3].
> I would not worry about the missing statistics. The optimizer does not
> leverage them at the current state.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#managed-memory
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/execution_configuration.html
> [3] http://flink.apache.org/visualizer/
>
> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Fabian,
>>
>>  Thank you for the reply.  Yes I do watch via the ui, is there another
>> way to see progress through the steps?
>>
>> I think I just figured it out, the hangup is in the sort phase (ID 4)
>> where 2 slots take all the time.  Looking in the UI most slots get less
>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together
>> its about 272M records and these will run for hours at this point.  Looks
>> like I need to figure out a different partitioning/sort strategy. I never
>> noticed before because when I run the system at ~1400 slots I don't use the
>> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
>> still works.
>>
>>
>> The getEnv output is very cool! Also very big, I've tried to summarize it
>> here in more of a yaml format as its on a different network.  Note the
>> parallelism was just set to 10 as I didn't know if that effected out

Re: Flink Batch Performance degradation at scale

2017-12-06 Thread Garrett Barton
: unknown
name: Est Cardinality value: unknown
costs:
name: Network value: 0
name: Disk I/O value 0
name: CPU value: 0
name: Cumulative Network value: unknown
name: Cumulative Disk I/O value: unknown
name: Cumulative CPU value: unknown
compiler_hints:
name: Output Size (bytes) value: none
name: Output Cardinality value: none
name: Avg. Output Record Size (bytes) value: none
name: Filter Factor value: none


id: 2
type: pact
pact: Map
contents: Map at ()
parallelism: 10
predecessors:
id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
driver_strategy: Map
global_properties:
name: partitioning v: RANDOM_PARTITIONED
name: Partitioning Order value: none
name: Uniqueness value: not unique
local_properties:
name: Order value: none
name: Grouping value: not grouped
name: Uniqueness value: not unique
estimates:
name: Est. Output Size value: unknown
name: Est Cardinality value: unknown
costs:
name: Network value: 0
name: Disk I/O value 0
name: CPU value: 0
name: Cumulative Network value: unknown
name: Cumulative Disk I/O value: unknown
name: Cumulative CPU value: unknown
compiler_hints:
name: Output Size (bytes) value: none
name: Output Cardinality value: none
name: Avg. Output Record Size (bytes) value: none
name: Filter Factor value: none

id: 1
type: pact
pact: Map
contents: map at main()
parallelism: 10
predecessors:
id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
driver_strategy: Map
global_properties:
name: partitioning v: RANDOM_PARTITIONED
name: Partitioning Order value: none
name: Uniqueness value: not unique
local_properties:
name: Order value: none
name: Grouping value: not grouped
name: Uniqueness value: not unique
estimates:
name: Est. Output Size value: unknown
name: Est Cardinality value: unknown
costs:
name: Network value: 0
name: Disk I/O value 0
name: CPU value: 0
name: Cumulative Network value: unknown
name: Cumulative Disk I/O value: unknown
name: Cumulative CPU value: unknown
compiler_hints:
name: Output Size (bytes) value: none
name: Output Cardinality value: none
name: Avg. Output Record Size (bytes) value: none
name: Filter Factor value: none

id: 0
type: sink
pact: Data Sink
contents: org.apache.flink.api.java.jadoop.mapreduce.HadoopOutputFormat
parallelism: 10
predecessors:
id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
driver_strategy: Map
global_properties:
name: partitioning v: RANDOM_PARTITIONED
name: Partitioning Order value: none
name: Uniqueness value: not unique
local_properties:
name: Order value: none
name: Grouping value: not grouped
name: Uniqueness value: not unique
estimates:
name: Est. Output Size value: unknown
name: Est Cardinality value: unknown
costs:
name: Network value: 0
name: Disk I/O value 0
name: CPU value: 0
name: Cumulative Network value: unknown
name: Cumulative Disk I/O value: unknown
name: Cumulative CPU value: unknown
compiler_hints:
name: Output Size (bytes) value: none
name: Output Cardinality value: none
name: Avg. Output Record Size (bytes) value: none
name: Filter Factor value: none​




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Flink's operators are designed to work in memory as long as possible and
> spill to disk once the memory budget is exceeded.
> Moreover, Flink aims to run programs in a pipelined fashion, such that
> multiple operators can process data at the same time.
> This behavior can make it a bit tricky to analyze the runtime behavior and
> progress of operators.
>
> It would be interesting to have a look at the execution plan for the
> program that you are running.
> The plan can be obtained from the ExecutionEnvironment by calling
> env.getExecutionPlan() instead of env.execute().
>
> I would also like to know how you track the progress of the program.
> Are you looking at the record counts displayed in the WebUI?
>
> Best,
> Fabian
>
>
>
> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> I have been moving some old MR and hive workflows into Flink because I'm
>> enjoying the api's and the ease of development is wonderful.  Things have
>> largely worked great until I tried to really scale some of the jobs
>> recently.
>>
>> I have for example

Flink Batch Performance degradation at scale

2017-12-05 Thread Garrett Barton
I have been moving some old MR and hive workflows into Flink because I'm
enjoying the api's and the ease of development is wonderful.  Things have
largely worked great until I tried to really scale some of the jobs
recently.

I have for example one etl job that reads in about 12B records at a time
and does a sort, some simple transformations, validation, a re-partition
and then output to a hive table.
When I built it with the sample set, ~200M, it worked great, took maybe a
minute and blew threw it.

What I have observed is there is some kind of saturation reached depending
on number of slots, number of nodes and the overall size of data to move.
When I run the 12B set, the first 1B go through in under 1 minute, really
really fast.  But its an extremely sharp drop off after that, the next 1B
might take 15 minutes, and then if I wait for the next 1B, its well over an
hour.

What I cant find is any obvious indicators or things to look at, everything
just grinds to a halt, I don't think the job would ever actually complete.

Is there something in the design of flink in batch mode that is perhaps
memory bound?  Adding more nodes/tasks does not fix it, just gets me a
little further along.  I'm already running around ~1,400 slots at this
point, I'd postulate needing 10,000+ to potentially make the job run, but
thats too much of my cluster gone, and I have yet to get flink to be stable
past 1,500.

Any idea's on where to look, or what to debug?  GUI is also very cumbersome
to use at this slot count too, so other measurement ideas are welcome too!

Thank you all.


Re: Classpath/ClassLoader issues

2017-10-06 Thread Garrett Barton
Fabian,

 Just to follow up on this, I took the patch, compiled that class and stuck
it into the existing 1.3.2 jar and all is well. (I couldn't get all of
flink to build correctly)

Thank you!

On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton <garrett.bar...@gmail.com>
wrote:

> Fabian,
>  Awesome!  After your initial email I got things to work by deploying my
> fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
> your pull request and give it a go tomorrow.
>
> On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Here's the pull request that hopefully fixes your issue:
>> https://github.com/apache/flink/pull/4690
>>
>> Best, Fabian
>>
>> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Garrett,
>>>
>>> I think I identified the problem.
>>> You said you put the Hive/HCat dependencies into your user fat Jar,
>>> correct? In this case, they are loaded with Flink's userClassLoader (as
>>> described before).
>>>
>>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>>> loads the user classes with the user class loader.
>>> However, when the HCatOutputFormat.getOutputCommitter() method is
>>> called, Hive tries to load additional classes with the current thread class
>>> loader (see at org.apache.hadoop.hive.common.
>>> JavaUtils.loadClass(JavaUtils.java:78)).
>>> This behavior is actually OK, because we usually set the context
>>> classloader to be the user classloader before calling user code. However,
>>> this has not been done here.
>>> So, this is in fact a bug.
>>>
>>> I created this JIRA issue: https://issues.apache.org/jira
>>> /browse/FLINK-7656 and will open a PR for that.
>>>
>>> Thanks for helping to diagnose the issue,
>>> Fabian
>>>
>>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> Fabian,
>>>>
>>>>  It looks like hive instantiates both input and output formats when
>>>> doing either. I use hive 1.2.1, and you can see in
>>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>>> happening after the writes complete and flink is in the finish/finalize
>>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>>> mark finished along with bytes sent and records sent being exactly what I
>>>> expect them to be.  The first error also mentions the master, is this the
>>>> flink jobmanager process then?
>>>>
>>>> The expanded stacktrace is:
>>>>
>>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>> xFinished(ExecutionGraph.java:1325)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>>> utionFinished(ExecutionVertex.java:688)
>>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>>> ed(Execution.java:797)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>>> eState(ExecutionGraph.java:1477)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>> ... 8 more
>>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>>> load foster storage handler
>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>>> eOnMaster(OutputFormatVertex.java:118)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>> xFinished(ExecutionGraph.java:1320)
>>>> ... 14 more
>>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:409)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java

Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-05 Thread Garrett Barton
Fabian,

 Turns out I was wrong.  My flow was in fact running in two separate jobs
due to me trying to use a local variable calculated by
...distinct().count() in a downstream flow.  The second flow indeed set
parallelism correctly!  Thank you for the help. :)

On Wed, Oct 4, 2017 at 8:01 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Garrett,
>
> that's strange. DataSet.reduceGroup() will create a non-parallel
> GroupReduce operator.
> So even without setting the parallelism manually to 1, the operator should
> not run in parallel.
> What might happen though is that a combiner is applied to locally reduce
> the data before it is shipped to the single instance.
> Does your GroupReduceFunction implement a Combiner interface?
>
> I'm not aware of visualization problems of the web UI.
> Can you maybe share a screenshot of the UI showing the issue?
>
> Thanks, Fabian
>
> 2017-10-03 21:57 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Gábor
>> ​,
>> Thank you for the reply, I gave that a go and the flow still showed
>> parallel 90 for each step.  Is the ui not 100% accurate perhaps?
>>
>> To get around it for now I implemented a partitioner that threw all the
>> data to the same partition, hack but works!​
>>
>> On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gga...@gmail.com> wrote:
>>
>>> Hi Garrett,
>>>
>>> You can call .setParallelism(1) on just this operator:
>>>
>>> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <garrett.bar...@gmail.com>
>>> wrote:
>>> > I have a complex alg implemented using the DataSet api and by default
>>> it
>>> > runs with parallel 90 for good performance. At the end I want to
>>> perform a
>>> > clustering of the resulting data and to do that correctly I need to
>>> pass all
>>> > the data through a single thread/process.
>>> >
>>> > I read in the docs that as long as I did a global reduce using
>>> > DataSet.reduceGroup(new GroupReduceFunction) that it would force
>>> it to a
>>> > single thread.  Yet when I run the flow and bring it up in the ui, I
>>> see
>>> > parallel 90 all the way through the dag including this one.
>>> >
>>> > Is there a config or feature to force the flow back to a single
>>> thread?  Or
>>> > should I just split this into two completely separate jobs?  I'd
>>> rather not
>>> > split as I would like to use flinks ability to iterate on this alg and
>>> > cluster combo.
>>> >
>>> > Thank you
>>>
>>
>>
>


Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Garrett Barton
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed
parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the
data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gga...@gmail.com> wrote:

> Hi Garrett,
>
> You can call .setParallelism(1) on just this operator:
>
> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>
> Best,
> Gabor
>
>
>
> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <garrett.bar...@gmail.com>
> wrote:
> > I have a complex alg implemented using the DataSet api and by default it
> > runs with parallel 90 for good performance. At the end I want to perform
> a
> > clustering of the resulting data and to do that correctly I need to pass
> all
> > the data through a single thread/process.
> >
> > I read in the docs that as long as I did a global reduce using
> > DataSet.reduceGroup(new GroupReduceFunction) that it would force it
> to a
> > single thread.  Yet when I run the flow and bring it up in the ui, I see
> > parallel 90 all the way through the dag including this one.
> >
> > Is there a config or feature to force the flow back to a single thread?
> Or
> > should I just split this into two completely separate jobs?  I'd rather
> not
> > split as I would like to use flinks ability to iterate on this alg and
> > cluster combo.
> >
> > Thank you
>


At end of complex parallel flow, how to force end step with parallel=1?

2017-10-02 Thread Garrett Barton
I have a complex alg implemented using the DataSet api and by default it
runs with parallel 90 for good performance. At the end I want to perform a
clustering of the resulting data and to do that correctly I need to pass
all the data through a single thread/process.

I read in the docs that as long as I did a global reduce using
DataSet.reduceGroup(new GroupReduceFunction) that it would force it to
a single thread.  Yet when I run the flow and bring it up in the ui, I see
parallel 90 all the way through the dag including this one.

Is there a config or feature to force the flow back to a single thread?  Or
should I just split this into two completely separate jobs?  I'd rather not
split as I would like to use flinks ability to iterate on this alg and
cluster combo.

Thank you


Re: Classpath/ClassLoader issues

2017-09-20 Thread Garrett Barton
Fabian,
 Awesome!  After your initial email I got things to work by deploying my
fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
your pull request and give it a go tomorrow.

On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Here's the pull request that hopefully fixes your issue:
> https://github.com/apache/flink/pull/4690
>
> Best, Fabian
>
> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Garrett,
>>
>> I think I identified the problem.
>> You said you put the Hive/HCat dependencies into your user fat Jar,
>> correct? In this case, they are loaded with Flink's userClassLoader (as
>> described before).
>>
>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>> loads the user classes with the user class loader.
>> However, when the HCatOutputFormat.getOutputCommitter() method is
>> called, Hive tries to load additional classes with the current thread class
>> loader (see at org.apache.hadoop.hive.common.
>> JavaUtils.loadClass(JavaUtils.java:78)).
>> This behavior is actually OK, because we usually set the context
>> classloader to be the user classloader before calling user code. However,
>> this has not been done here.
>> So, this is in fact a bug.
>>
>> I created this JIRA issue: https://issues.apache.org/jira
>> /browse/FLINK-7656 and will open a PR for that.
>>
>> Thanks for helping to diagnose the issue,
>> Fabian
>>
>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>
>>> Fabian,
>>>
>>>  It looks like hive instantiates both input and output formats when
>>> doing either. I use hive 1.2.1, and you can see in
>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>> happening after the writes complete and flink is in the finish/finalize
>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>> mark finished along with bytes sent and records sent being exactly what I
>>> expect them to be.  The first error also mentions the master, is this the
>>> flink jobmanager process then?
>>>
>>> The expanded stacktrace is:
>>>
>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1325)
>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>> utionFinished(ExecutionVertex.java:688)
>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:797)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1477)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> ... 8 more
>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>> load foster storage handler
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>> eOnMaster(OutputFormatVertex.java:118)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1320)
>>> ... 14 more
>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:409)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:367)
>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>> utputFormat(HCatBaseOutputFormat.java:77)
>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>> tCommitter(HCatOutputFormat.java:275)
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>> ... 16 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.C

Re: Classpath/ClassLoader issues

2017-09-19 Thread Garrett Barton
Fabian,

 It looks like hive instantiates both input and output formats when doing
either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
where it tries to load both.  It looks like its happening after the writes
complete and flink is in the finish/finalize stage.  When I watch the
counters in the Flink ui, i see all output tasks mark finished along with
bytes sent and records sent being exactly what I expect them to be.  The
first error also mentions the master, is this the flink jobmanager process
then?

The expanded stacktrace is:

Caused by: java.lang.Exception: Failed to finalize execution on master
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1325)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:688)
at
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:797)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1477)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
... 8 more
Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load
foster storage handler
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(OutputFormatVertex.java:118)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1320)
... 14 more
Caused by: java.io.IOException: Failed to load foster storage handler
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:409)
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:367)
at
org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:77)
at
org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(HCatOutputFormat.java:275)
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.o
rc.OrcInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
init>(FosterStorageHandler.68)
at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
HCatUtil.java:404)


Thank you all for any help. :)

On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Garrett,
>
> Flink distinguishes between two classloaders: 1) the system classloader
> which is the main classloader of the process. This classloader loads all
> jars in the ./lib folder and 2) the user classloader which loads the job
> jar.
> AFAIK, the different operators do not have distinct classloaders. So, in
> principle all operators should use the same user classloader.
>
> According to the stacktrace you posted, the OrcInputFormat cannot be found
> when you try to emit to an ORC file.
> This looks suspicious because I would rather expect the OrcOutputFormat to
> be the problem than the input format.
> Can you post more of the stacktrace? This would help to identify the spot
> in the Flink code where the exception is thrown.
>
> Thanks, Fabian
>
> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Hey all,
>>
>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>
>> Currently I can get Flink to read the source table fine, both with using
>> The HCatalog Input format directly, and by using the flink-hcatalog
>> wrapper.  Processing the data also works fine. Dumping to console or a text
>> file also works fine.
>>
>> I'm now stuck trying to write the data out, I'm getting
>> ClassNotFoundExceptions:
>>
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLo