Hi there,
This will be a little long so please bear with me. There is a buildable example
available at https://github.com/sfcoy/sfcoy-spark-cce-test.
Say I have the following three tables:
Machines
Id,MachineType
100001,A
100002,B
200003,B
200004,A
200005,B
Bolts
MachineType,Description
A,20 x M5
A,30 x M5
B,"2"" x 1/4"""
B,"2"" x 1/2"""
B,"2"" x 3/8"""
A,40 x M6
A,50 x M10
B,"1"" x 1/8"""
Nuts
MachineType,Description
A,M5
A,M6
B,"1/4"""
B,"1/8"""
B,"3/8"""
The objective is to create lists of Machines by Id, with all of their bolts and
nuts listed on the same line:
100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6
The output is further categorised by the first 5 digits of the machine id,
although that seems immaterial to this problem.
In practice I’m dealing with ~70 million machines with a couple of hundred
thousand types - therefore Spark!
The code to do this looks like:
final Dataset<Machine> machineRecords = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/machines.csv")
.as(Encoders.bean(Machine.class))
.persist();
final int workerCount = sparkContext.defaultParallelism();
final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/nuts.csv")
.as(Encoders.bean(Nut.class))
.toJavaRDD()
.mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues,
SparkCCETest::mergeCombiners)
.persist(StorageLevel.MEMORY_AND_DISK());
final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/data/bolts.csv")
.as(Encoders.bean(Bolt.class))
.toJavaRDD()
.mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues,
SparkCCETest::mergeCombiners)
.persist(StorageLevel.MEMORY_AND_DISK());
machineRecords
.toJavaRDD()
.mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
.join(nutsByMachine)
.join(boltsByMachine)
.map(Tuple2::_2)
.map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
.mapToPair(machineWithNutsBolts -> new
Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
.repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
.foreachPartition(machineIterator -> { // <- line 77
///...
});
static String exportFileFor(Machine machine) {
return machine.getId().substring(0, 5);
}
static <T> List<T> createListAndCombine(T v) {
List<T> c = new ArrayList<>();
c.add(v);
return c;
}
static <T> List<T> mergeValues(List<T> c, T v) {
c.add(v);
return c;
}
static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
c1.addAll(c2);
return c1;
}
Running this yields a ClassCastException:
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it
was not found on disk or in memory
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it
was not found on disk or in memory
20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
…
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
at
org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at org.example.SparkCCETest.main(SparkCCETest.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to
org.example.Nut
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at
org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
at
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Anyway, to cut a long story short, its occurred to me while creating this
reproducer to replace those generic methods at the bottom of the code with
explicitly typed versions.
This made the problem go away.
This seems like a work around, but does anyone think this could be a bug?
Thanks,
Steve C
P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about
this the wrong way then I would be pleased to hear any better ideas.
This email contains confidential information of and is the copyright of
Infomedia. It must not be forwarded, amended or disclosed without consent of
the sender. If you received this message by mistake, please advise the sender
and delete all copies. Security of transmission on the internet cannot be
guaranteed, could be infected, intercepted, or corrupted and you should ensure
you have suitable antivirus protection in place. By sending us your or any
third party personal details, you consent to (or confirm you have obtained
consent from such third parties) to Infomedia’s privacy policy.
http://www.infomedia.com.au/privacy-policy/