Re: Classpath/ClassLoader issues

2017-10-08 Thread Fabian Hueske
Thanks for the feedback Garrett!
Good to know that this fixes the problem.

The patch will be included in the next releases.

Best, Fabian

2017-10-06 20:31 GMT+02:00 Garrett Barton :

> Fabian,
>
>  Just to follow up on this, I took the patch, compiled that class and
> stuck it into the existing 1.3.2 jar and all is well. (I couldn't get all
> of flink to build correctly)
>
> Thank you!
>
> On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton 
> wrote:
>
>> Fabian,
>>  Awesome!  After your initial email I got things to work by deploying my
>> fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
>> your pull request and give it a go tomorrow.
>>
>> On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske  wrote:
>>
>>> Here's the pull request that hopefully fixes your issue:
>>> https://github.com/apache/flink/pull/4690
>>>
>>> Best, Fabian
>>>
>>> 2017-09-20 16:15 GMT+02:00 Fabian Hueske :
>>>
 Hi Garrett,

 I think I identified the problem.
 You said you put the Hive/HCat dependencies into your user fat Jar,
 correct? In this case, they are loaded with Flink's userClassLoader (as
 described before).

 In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
 loads the user classes with the user class loader.
 However, when the HCatOutputFormat.getOutputCommitter() method is
 called, Hive tries to load additional classes with the current thread class
 loader (see at org.apache.hadoop.hive.common.
 JavaUtils.loadClass(JavaUtils.java:78)).
 This behavior is actually OK, because we usually set the context
 classloader to be the user classloader before calling user code. However,
 this has not been done here.
 So, this is in fact a bug.

 I created this JIRA issue: https://issues.apache.org/jira
 /browse/FLINK-7656 and will open a PR for that.

 Thanks for helping to diagnose the issue,
 Fabian

 2017-09-19 22:05 GMT+02:00 Garrett Barton :

> Fabian,
>
>  It looks like hive instantiates both input and output formats when
> doing either. I use hive 1.2.1, and you can see in
> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
> happening after the writes complete and flink is in the finish/finalize
> stage.  When I watch the counters in the Flink ui, i see all output tasks
> mark finished along with bytes sent and records sent being exactly what I
> expect them to be.  The first error also mentions the master, is this the
> flink jobmanager process then?
>
> The expanded stacktrace is:
>
> Caused by: java.lang.Exception: Failed to finalize execution on master
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
> xFinished(ExecutionGraph.java:1325)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
> utionFinished(ExecutionVertex.java:688)
> at org.apache.flink.runtime.executiongraph.Execution.markFinish
> ed(Execution.java:797)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
> eState(ExecutionGraph.java:1477)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> ... 8 more
> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
> load foster storage handler
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
> eOnMaster(OutputFormatVertex.java:118)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
> xFinished(ExecutionGraph.java:1320)
> ... 14 more
> Caused by: java.io.IOException: Failed to load foster storage handler
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
> CatUtil.java:409)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
> CatUtil.java:367)
> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
> utputFormat(HCatBaseOutputFormat.java:77)
> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
> tCommitter(HCatOutputFormat.java:275)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
> ... 16 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Laun

Re: Windowed Stream Queryable State Support

2017-10-08 Thread Kostas Kloudas
Hi Vijay,

If by “Windowed Stream Queryable State Support” you mean when will Flink allow 
to query 
the state of an in-flight window, then a version will be available in 1.4 yes.

Cheers,
Kostas

> On Oct 7, 2017, at 2:55 PM, vijayakumar palaniappan  
> wrote:
> 
> What is the state of Windowed Stream Queryable State Support?
> 
> Is it available in 1.3 or planned for 1.4?
> 
> Thanks
> Vijay
> 



Re: RocksDB segfault inside timer when accessing/clearing state

2017-10-08 Thread Stefan Richter
Hi,

I would assume that those segfaults are only observed *after* a job is already 
in the process of canceling? This is a known problem, but currently „accepted“ 
behaviour after discussions with Stephan and Aljoscha (in CC). From that 
discussion, the background is that the native RocksDB resource is disposed 
somewhere in the process of cancelation, and the timers are executed in a 
different thread than the main event processing loop that is exited in 
cancelation. We would currently either have to a) wait for all timer events to 
finish before cancelation or b) somehow synchronize every access to the RocksDB 
resource field. The problem with option a) is that it can delay cancelation for 
an uncertain amount of time and we want to cancel asap so that the job can 
restart immediately in case of failover. Option b) introduces additional costs 
per access under normal operations to avoid a problem after the point that a 
job is already canceling.

Personally, I also absolutely don’t like the idea of accepting this faulty 
behaviour and would be in favour of a „cleaner“ solution, maybe somehow 
reworking how the timers events are executed or interact with normal processing.

Best,
Stefan

> Am 07.10.2017 um 05:44 schrieb Kien Truong :
> 
> Hi,
> 
> We are using processing timer to implement some state clean up logic.
> After switching from FsStateBackend to RocksDB, we encounter a lot of 
> segfault from the Time Trigger threads when accessing/clearing state value.
> 
> We currently uses the latest 1.3-SNAPSHOT, with the patch upgrading RocksDB 
> to 5.6.1, because the seg faults happen less frequently with this version 
> than with the original FRocksDB
> 
> Perhaps, there's some race conditions here. Any insights would be much 
> appreciated.
> 
> Best regards,
> Kien
>