Hi,
We are getting a ConcurrentModificationException, the complete stack trace
is as follows:

org.apache.flink.optimizer.CompilerException: Error translating node 'Data
> Source "at compute(ArpackSVD.java:367)
> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[
> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
> [ordering=null, grouped=null, unique=null] ]]':
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classLoader (akka.actor.ReflectiveDynamicAccess)
> _pm (akka.actor.ActorSystemImpl)
> actorSystem (org.apache.flink.client.program.Client)
> client (org.apache.flink.client.program.ContextEnvironment)
> context (org.apache.flink.api.java.operators.MapOperator)
> matrix (flink.pca.impl.svd.ArpackSVD)
> this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
> at
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:166)
> at org.apache.flink.client.program.Client.getJobGraph(Client.java:534)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:347)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at flink.pca.impl.svd.ArpackSVD.compute(ArpackSVD.java:379)
> at flink.pca.impl.PCA.computeSVD(PCA.java:110)
> at flink.pca.impl.PCA.project(PCA.java:35)
> at flink.pca.impl.PCASystemTest.runFlinkJob(PCASystemTest.java:90)
> at flink.pca.impl.PCASystemTest.main(PCASystemTest.java:61)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classLoader (akka.actor.ReflectiveDynamicAccess)
> _pm (akka.actor.ActorSystemImpl)
> actorSystem (org.apache.flink.client.program.Client)
> client (org.apache.flink.client.program.ContextEnvironment)
> context (org.apache.flink.api.java.operators.MapOperator)
> matrix (flink.pca.impl.svd.ArpackSVD)
> this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:193)
> at
> org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:259)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:890)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
> ... 30 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
> at java.util.Vector$Itr.next(Vector.java:1137)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 73 more



Can anyone enlighten us as why is it like this or how to fix this issue? We
did a bit of google search, but all we get is some problem with serializing
broadcast variable. We use flink bulk iterations and this variable is
broadcasted to both map and reduce in one dataflow!

Thanks & Regards
Biplob Biswas

Reply via email to