[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14729302#comment-14729302
 ] 

Stephan Ewen commented on FLINK-2617:
-------------------------------------

I agree, this is critical.

The reason for this exception is that the Hadoop code assumes only one thread 
is ever working with the configuration. That is the MapReduce model where each 
task is in a dedicated JVM.
In Flink, multiple tasks can run in one JVM, which goes a bit against Hadoop's 
assumptions.

We need to lock the {{open()}} method of all MapReduce compatibility functions 
(on a static object) in order to prevent this.

A short term mitigation may be to change your setup to run more TaskManagers, 
each with one slot - that way, you have one task per JVM as well.

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-2617
>                 URL: https://issues.apache.org/jira/browse/FLINK-2617
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>            Reporter: Arnaud Linz
>            Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.<init>(HashSet.java:117)
> org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
> org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
> org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:744)
> {code}
> Flink "user" code looks like:
> {code}
> import java.io.IOException;
> import java.io.Serializable;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.io.FileOutputFormat;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.util.Collector;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.compress.CompressionCodec;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hive.hcatalog.data.DefaultHCatRecord;
> import org.apache.hive.hcatalog.data.schema.HCatSchema;
> import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
> (...) 
>         final Job job = Job.getInstance();
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>         final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat 
> = new HadoopInputFormat<NullWritable, 
>         DefaultHCatRecord>(
>             (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, 
> filter), //
>             NullWritable.class, //
>             DefaultHCatRecord.class, //
>             job);
>         final HCatSchema inputSchema = 
> HCatInputFormat.getTableSchema(job.getConfiguration());
>         @SuppressWarnings("serial")
>         final DataSet<T> dataSet = cluster
>             .createInput(inputFormat)
>             .flatMap(new FlatMapFunction<Tuple2<NullWritable, 
> DefaultHCatRecord>, T>() {
>                 @Override
>                 public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> 
> value, Collector<T> out) throws Exception { // NOPMD
>                     final T record = createBean(value.f1, inputSchema);
>                     out.collect(record);
>                 }
>             }).returns(beanClass);
> (...)            
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to