[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405069#comment-17405069
 ] 

Melitta Dragaschnig commented on SPARK-16087:
---------------------------------------------

Here's a minimal example that reproduces the problem, using data from Cassandra 
tables.

The  keyspace definition:
{code:java}
create keyspace if not exists test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': 1};{code}
and the code:

 
{code:java}
import com.datastax.spark.connector.{SomeColumns, toRDDFunctions, 
toSparkContextFunctions}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.sum

object UnionTest {
 case class AnimalCount(name: String, count: Long)
 case class Animal(name: String)

 def main(args: Array[String]): Unit = {
 val keyspace = "test"

 val spark = SparkSession.builder().master("local[*]").appName("repro")
 //.config("spark.driver.host","127.0.0.1") // using either of those two lines 
 //.config("spark.driver.host","localhost") // removes the problem
 .getOrCreate()

 createTables(spark, keyspace)

 import spark.implicits._
 val t1 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, 
"region1").toDS().as[AnimalCount]
 val t2 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, 
"region2").toDS().as[AnimalCount]
 val t3 = spark.sparkContext.cassandraTable[AnimalCount](keyspace, 
"region3").toDS().as[AnimalCount]
 val airborne = spark.sparkContext.cassandraTable[Animal](keyspace, 
"airborne").toDS().as[Animal]

 val res = Seq(t1, t2).reduce(_ union _)
 .join(airborne, "name") // counting only airborne creatures in those regions
 .union(t3.toDF())  // removing this line of code removes the problem

 val counts = res.groupBy("name").agg(sum("count").as("total"))
 counts.show(5, false)
 }

 def createTables(spark: SparkSession, keyspace: String): Unit = {
 val collection = spark.sparkContext.parallelize(Seq(AnimalCount("dog", 50), 
AnimalCount("cow", 60)))
 collection.saveAsCassandraTable(keyspace, "region1", SomeColumns("name", 
"count"))

 val collection2 = spark.sparkContext.parallelize(Seq(AnimalCount("dog", 40), 
AnimalCount("bird", 8)))
 collection2.saveAsCassandraTable(keyspace, "region2", SomeColumns("name", 
"count"))

 val collection3 = spark.sparkContext.parallelize(Seq(AnimalCount("cow", 2), 
AnimalCount("bird", 80)))
 collection3.saveAsCassandraTable(keyspace, "region3", SomeColumns("name", 
"count"))

 val airborne = spark.sparkContext.parallelize(Seq(Animal("bird")))
 airborne.saveAsCassandraTable(keyspace, "airborne", SomeColumns("name"))
 }
}
{code}
 

 

Removing the join and doing a simple union like this
{code:java}
val res = Seq(t1, t2, t3).reduce(_ union _){code}
also removes the hanging problem.

> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
>                 Key: SPARK-16087
>                 URL: https://issues.apache.org/jira/browse/SPARK-16087
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.4.1, 1.6.1, 2.0.1, 3.0.1
>            Reporter: Kevin Conaway
>            Priority: Critical
>              Labels: bulk-closed
>         Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-00000, part-00001, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
>     public static void main(String [] args) throws Exception {
>         JavaSparkContext sc = new JavaSparkContext(
>             new SparkConf()
>                 .set("spark.serializer", KryoSerializer.class.getName())
>                 .set("spark.master", "local[*]")
>                 .setAppName(SparkBug.class.getName())
>         );
>         JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
>            "hdfs://localhost:9000/part-00000",
>             LongWritable.class,
>             BytesWritable.class
>         ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, 
> LongWritable, BytesWritable>() {
>             @Override
>             public Tuple2<LongWritable, BytesWritable> 
> call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception {
>                 return new Tuple2<>(
>                     new LongWritable(tuple._1.get()),
>                     new BytesWritable(tuple._2.copyBytes())
>                 );
>             }
>         }).persist(
>             StorageLevel.MEMORY_ONLY()
>         );
>         System.out.println("Before union: " + rdd1.count());
>         JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
>             "hdfs://localhost:9000/part-00001",
>             LongWritable.class,
>             BytesWritable.class
>         );
>         JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
>         System.out.println("After union: " + joined.count());
>     }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to