[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736800#comment-14736800
 ] 

Fabian Hueske commented on FLINK-2617:
--------------------------------------

Hi [~ArnaudL], I opened a PR against the 0.10 SNAPSHOT master that synchronizes 
open() calls in Flink's {{HadoopInputFormat}}.
Can you check if that solves the problem with the HCatInputFormat? I can also 
port the fix to 0.9 if necessary.

Btw. I did not find the reason for the exception, but I verified that Flink is 
separate config objects in its code. So I assume, the root cause is somewhere 
hidden in the HCat code.

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-2617
>                 URL: https://issues.apache.org/jira/browse/FLINK-2617
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>            Reporter: Arnaud Linz
>            Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.<init>(HashSet.java:117)
> org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
> org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
> org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:744)
> {code}
> Flink "user" code looks like:
> {code}
> import java.io.IOException;
> import java.io.Serializable;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.io.FileOutputFormat;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.util.Collector;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.compress.CompressionCodec;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hive.hcatalog.data.DefaultHCatRecord;
> import org.apache.hive.hcatalog.data.schema.HCatSchema;
> import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
> (...) 
>         final Job job = Job.getInstance();
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>         final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat 
> = new HadoopInputFormat<NullWritable, 
>         DefaultHCatRecord>(
>             (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, 
> filter), //
>             NullWritable.class, //
>             DefaultHCatRecord.class, //
>             job);
>         final HCatSchema inputSchema = 
> HCatInputFormat.getTableSchema(job.getConfiguration());
>         @SuppressWarnings("serial")
>         final DataSet<T> dataSet = cluster
>             .createInput(inputFormat)
>             .flatMap(new FlatMapFunction<Tuple2<NullWritable, 
> DefaultHCatRecord>, T>() {
>                 @Override
>                 public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> 
> value, Collector<T> out) throws Exception { // NOPMD
>                     final T record = createBean(value.f1, inputSchema);
>                     out.collect(record);
>                 }
>             }).returns(beanClass);
> (...)            
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to