[ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405069#comment-17405069 ]
Melitta Dragaschnig commented on SPARK-16087: --------------------------------------------- Here's a minimal example that reproduces the problem, using data from Cassandra tables. The keyspace definition: {code:java} create keyspace if not exists test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};{code} and the code: {code:java} import com.datastax.spark.connector.{SomeColumns, toRDDFunctions, toSparkContextFunctions} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.sum object UnionTest { case class AnimalCount(name: String, count: Long) case class Animal(name: String) def main(args: Array[String]): Unit = { val keyspace = "test" val spark = SparkSession.builder().master("local[*]").appName("repro") //.config("spark.driver.host","127.0.0.1") // using either of those two lines //.config("spark.driver.host","localhost") // removes the problem .getOrCreate() createTables(spark, keyspace) import spark.implicits._ val t1 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, "region1").toDS().as[AnimalCount] val t2 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, "region2").toDS().as[AnimalCount] val t3 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, "region3").toDS().as[AnimalCount] val airborne = spark.sparkContext.cassandraTable[Animal](keyspace, "airborne").toDS().as[Animal] val res = Seq(t1, t2).reduce(_ union _) .join(airborne, "name") // counting only airborne creatures in those regions .union(t3.toDF()) // removing this line of code removes the problem val counts = res.groupBy("name").agg(sum("count").as("total")) counts.show(5, false) } def createTables(spark: SparkSession, keyspace: String): Unit = { val collection = spark.sparkContext.parallelize(Seq(AnimalCount("dog", 50), AnimalCount("cow", 60))) collection.saveAsCassandraTable(keyspace, "region1", SomeColumns("name", "count")) val collection2 = spark.sparkContext.parallelize(Seq(AnimalCount("dog", 40), AnimalCount("bird", 8))) collection2.saveAsCassandraTable(keyspace, "region2", SomeColumns("name", "count")) val collection3 = spark.sparkContext.parallelize(Seq(AnimalCount("cow", 2), AnimalCount("bird", 80))) collection3.saveAsCassandraTable(keyspace, "region3", SomeColumns("name", "count")) val airborne = spark.sparkContext.parallelize(Seq(Animal("bird"))) airborne.saveAsCassandraTable(keyspace, "airborne", SomeColumns("name")) } } {code} Removing the join and doing a simple union like this {code:java} val res = Seq(t1, t2, t3).reduce(_ union _){code} also removes the hanging problem. > Spark Hangs When Using Union With Persisted Hadoop RDD > ------------------------------------------------------ > > Key: SPARK-16087 > URL: https://issues.apache.org/jira/browse/SPARK-16087 > Project: Spark > Issue Type: Bug > Affects Versions: 1.4.1, 1.6.1, 2.0.1, 3.0.1 > Reporter: Kevin Conaway > Priority: Critical > Labels: bulk-closed > Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot > 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, > part-00000, part-00001, spark-16087.tar.gz > > > Spark hangs when materializing a persisted RDD that was built from a Hadoop > sequence file and then union-ed with a similar RDD. > Below is a small file that exhibits the issue: > {code:java} > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.LongWritable; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.serializer.KryoSerializer; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > public class SparkBug { > public static void main(String [] args) throws Exception { > JavaSparkContext sc = new JavaSparkContext( > new SparkConf() > .set("spark.serializer", KryoSerializer.class.getName()) > .set("spark.master", "local[*]") > .setAppName(SparkBug.class.getName()) > ); > JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile( > "hdfs://localhost:9000/part-00000", > LongWritable.class, > BytesWritable.class > ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, > LongWritable, BytesWritable>() { > @Override > public Tuple2<LongWritable, BytesWritable> > call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception { > return new Tuple2<>( > new LongWritable(tuple._1.get()), > new BytesWritable(tuple._2.copyBytes()) > ); > } > }).persist( > StorageLevel.MEMORY_ONLY() > ); > System.out.println("Before union: " + rdd1.count()); > JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile( > "hdfs://localhost:9000/part-00001", > LongWritable.class, > BytesWritable.class > ); > JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2); > System.out.println("After union: " + joined.count()); > } > } > {code} > You'll need to upload the attached part-00000 and part-00001 to a local hdfs > instance (I'm just using a dummy [Single Node > Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] > locally). > Some things to note: > - It does not hang if rdd1 is not persisted > - It does not hang is rdd1 is not materialized (via calling rdd1.count()) > before the union-ed RDD is materialized > - It does not hang if the mapToPair() transformation is removed. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org