[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740495#comment-14740495 ]
Fabian Hueske commented on FLINK-2617: -------------------------------------- I see. SNAPSHOT builds are only uploaded if all tests pass. The last build failed due to some flaky tests. I will push an empty commit to trigger another build. *fingersCrossed* that this build passes. > ConcurrentModificationException when using HCatRecordReader to access a hive > table > ---------------------------------------------------------------------------------- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility > Reporter: Arnaud Linz > Assignee: Fabian Hueske > Priority: Critical > Fix For: 0.9, 0.10 > > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) > java.util.HashMap$KeyIterator.next(HashMap.java:960) > java.util.AbstractCollection.addAll(AbstractCollection.java:341) > java.util.HashSet.<init>(HashSet.java:117) > org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554) > org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439) > org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637) > org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112) > org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91) > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182) > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56) > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > java.lang.Thread.run(Thread.java:744) > {code} > Flink "user" code looks like: > {code} > import java.io.IOException; > import java.io.Serializable; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.io.FileOutputFormat; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import org.apache.flink.util.Collector; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.compress.CompressionCodec; > import org.apache.hadoop.mapreduce.InputFormat; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hive.hcatalog.data.DefaultHCatRecord; > import org.apache.hive.hcatalog.data.schema.HCatSchema; > import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; > (...) > final Job job = Job.getInstance(); > @SuppressWarnings({ "unchecked", "rawtypes" }) > final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat > = new HadoopInputFormat<NullWritable, > DefaultHCatRecord>( > (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, > filter), // > NullWritable.class, // > DefaultHCatRecord.class, // > job); > final HCatSchema inputSchema = > HCatInputFormat.getTableSchema(job.getConfiguration()); > @SuppressWarnings("serial") > final DataSet<T> dataSet = cluster > .createInput(inputFormat) > .flatMap(new FlatMapFunction<Tuple2<NullWritable, > DefaultHCatRecord>, T>() { > @Override > public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> > value, Collector<T> out) throws Exception { // NOPMD > final T record = createBean(value.f1, inputSchema); > out.collect(record); > } > }).returns(beanClass); > (...) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)