Thanks for the feedback Garrett!
Good to know that this fixes the problem.

The patch will be included in the next releases.

Best, Fabian

2017-10-06 20:31 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:

> Fabian,
>
>  Just to follow up on this, I took the patch, compiled that class and
> stuck it into the existing 1.3.2 jar and all is well. (I couldn't get all
> of flink to build correctly)
>
> Thank you!
>
> On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton <garrett.bar...@gmail.com>
> wrote:
>
>> Fabian,
>>  Awesome!  After your initial email I got things to work by deploying my
>> fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
>> your pull request and give it a go tomorrow.
>>
>> On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Here's the pull request that hopefully fixes your issue:
>>> https://github.com/apache/flink/pull/4690
>>>
>>> Best, Fabian
>>>
>>> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> Hi Garrett,
>>>>
>>>> I think I identified the problem.
>>>> You said you put the Hive/HCat dependencies into your user fat Jar,
>>>> correct? In this case, they are loaded with Flink's userClassLoader (as
>>>> described before).
>>>>
>>>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>>>> loads the user classes with the user class loader.
>>>> However, when the HCatOutputFormat.getOutputCommitter() method is
>>>> called, Hive tries to load additional classes with the current thread class
>>>> loader (see at org.apache.hadoop.hive.common.
>>>> JavaUtils.loadClass(JavaUtils.java:78)).
>>>> This behavior is actually OK, because we usually set the context
>>>> classloader to be the user classloader before calling user code. However,
>>>> this has not been done here.
>>>> So, this is in fact a bug.
>>>>
>>>> I created this JIRA issue: https://issues.apache.org/jira
>>>> /browse/FLINK-7656 and will open a PR for that.
>>>>
>>>> Thanks for helping to diagnose the issue,
>>>> Fabian
>>>>
>>>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>
>>>>> Fabian,
>>>>>
>>>>>  It looks like hive instantiates both input and output formats when
>>>>> doing either. I use hive 1.2.1, and you can see in
>>>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>>>> happening after the writes complete and flink is in the finish/finalize
>>>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>>>> mark finished along with bytes sent and records sent being exactly what I
>>>>> expect them to be.  The first error also mentions the master, is this the
>>>>> flink jobmanager process then?
>>>>>
>>>>> The expanded stacktrace is:
>>>>>
>>>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>>> xFinished(ExecutionGraph.java:1325)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>>>> utionFinished(ExecutionVertex.java:688)
>>>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>>>> ed(Execution.java:797)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>>>> eState(ExecutionGraph.java:1477)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>>> ... 8 more
>>>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>>>> load foster storage handler
>>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>>>> eOnMaster(OutputFormatVertex.java:118)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>>> xFinished(ExecutionGraph.java:1320)
>>>>> ... 14 more
>>>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:409)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:367)
>>>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>>>> utputFormat(HCatBaseOutputFormat.java:77)
>>>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>>>> tCommitter(HCatOutputFormat.java:275)
>>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>>>> ... 16 more
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:348)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>> java:78)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>> t>(FosterStorageHandler.68)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:404)
>>>>>
>>>>>
>>>>> Thank you all for any help. :)
>>>>>
>>>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Garrett,
>>>>>>
>>>>>> Flink distinguishes between two classloaders: 1) the system
>>>>>> classloader which is the main classloader of the process. This 
>>>>>> classloader
>>>>>> loads all jars in the ./lib folder and 2) the user classloader which 
>>>>>> loads
>>>>>> the job jar.
>>>>>> AFAIK, the different operators do not have distinct classloaders. So,
>>>>>> in principle all operators should use the same user classloader.
>>>>>>
>>>>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>>>>> found when you try to emit to an ORC file.
>>>>>> This looks suspicious because I would rather expect the
>>>>>> OrcOutputFormat to be the problem than the input format.
>>>>>> Can you post more of the stacktrace? This would help to identify the
>>>>>> spot in the Flink code where the exception is thrown.
>>>>>>
>>>>>> Thanks, Fabian
>>>>>>
>>>>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>>  I am trying out a POC with flink on yarn.  My simple goal is to
>>>>>>> read from a Hive ORC table, process some data and write to a new Hive 
>>>>>>> ORC
>>>>>>> table.
>>>>>>>
>>>>>>> Currently I can get Flink to read the source table fine, both with
>>>>>>> using The HCatalog Input format directly, and by using the 
>>>>>>> flink-hcatalog
>>>>>>> wrapper.  Processing the data also works fine. Dumping to console or a 
>>>>>>> text
>>>>>>> file also works fine.
>>>>>>>
>>>>>>> I'm now stuck trying to write the data out, I'm getting
>>>>>>> ClassNotFoundExceptions:
>>>>>>>
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>> at java.lang.Class.forName0(Native Method)
>>>>>>> at java.lang.Class.forName(Class.java:348)
>>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>>>> java:78)
>>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>>>> t>(FosterStorageHandler.68)
>>>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>>>> CatUtil.java:404)
>>>>>>>
>>>>>>> Since I read from an Orc table, I know I have that class in my
>>>>>>> classpath.  So I'm wondering if each stage/step in a flink process has 
>>>>>>> some
>>>>>>> kind of special classloader that I am not aware of?  (also its odd that 
>>>>>>> it
>>>>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>>>>
>>>>>>> My output code looks like this:
>>>>>>>
>>>>>>>
>>>>>>> Job job = Job.getInstance(conf);
>>>>>>>
>>>>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>>>>> "table",null));
>>>>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>>>>> a(job.getConfiguration());
>>>>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>>>>
>>>>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>>>>
>>>>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>>>>> HadoopOutputFormat(outputFormat, job);
>>>>>>>
>>>>>>> // from previous processing step
>>>>>>> hcat.output(out);
>>>>>>> env.execute("run");
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> One other thing to note, I had to put 
>>>>>>> flink-hadoop-compatability_2.11-1.3.2.jar
>>>>>>> into the lib folder of the flink distro.  Building my code in a shaded 
>>>>>>> jar
>>>>>>> with that dependency did not work for me.  However when I put the 
>>>>>>> hive/hcat
>>>>>>> jars in the lib folder it caused lots of other errors.  Since the 
>>>>>>> shading
>>>>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>>>>> some funky class loader stuff going on.  I don't understand why this 
>>>>>>> doesnt
>>>>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>>>>> present, plus I successfully read from an ORC table.
>>>>>>>
>>>>>>> Any help or explanation into how the classpath/classloading works
>>>>>>> would be wonderful!
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to