Hi, We are getting a ConcurrentModificationException, the complete stack trace is as follows:
org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at compute(ArpackSVD.java:367) > (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': > java.util.ConcurrentModificationException > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classLoader (akka.actor.ReflectiveDynamicAccess) > _pm (akka.actor.ActorSystemImpl) > actorSystem (org.apache.flink.client.program.Client) > client (org.apache.flink.client.program.ContextEnvironment) > context (org.apache.flink.api.java.operators.MapOperator) > matrix (flink.pca.impl.svd.ArpackSVD) > this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:166) > at org.apache.flink.client.program.Client.getJobGraph(Client.java:534) > at org.apache.flink.client.program.Client.runBlocking(Client.java:347) > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at flink.pca.impl.svd.ArpackSVD.compute(ArpackSVD.java:379) > at flink.pca.impl.PCA.computeSVD(PCA.java:110) > at flink.pca.impl.PCA.project(PCA.java:35) > at flink.pca.impl.PCASystemTest.runFlinkJob(PCASystemTest.java:90) > at flink.pca.impl.PCASystemTest.main(PCASystemTest.java:61) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) > at org.apache.flink.client.program.Client.runBlocking(Client.java:252) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028) > Caused by: com.esotericsoftware.kryo.KryoException: > java.util.ConcurrentModificationException > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classLoader (akka.actor.ReflectiveDynamicAccess) > _pm (akka.actor.ActorSystemImpl) > actorSystem (org.apache.flink.client.program.Client) > client (org.apache.flink.client.program.ContextEnvironment) > context (org.apache.flink.api.java.operators.MapOperator) > matrix (flink.pca.impl.svd.ArpackSVD) > this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:193) > at > org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) > at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:259) > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:890) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286) > ... 30 more > Caused by: java.util.ConcurrentModificationException > at java.util.Vector$Itr.checkForComodification(Vector.java:1184) > at java.util.Vector$Itr.next(Vector.java:1137) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > ... 73 more Can anyone enlighten us as why is it like this or how to fix this issue? We did a bit of google search, but all we get is some problem with serializing broadcast variable. We use flink bulk iterations and this variable is broadcasted to both map and reduce in one dataflow! Thanks & Regards Biplob Biswas