[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404510#comment-17404510
 ] 

Melitta Dragaschnig edited comment on SPARK-16087 at 8/25/21, 2:26 PM:
-----------------------------------------------------------------------

I ran into this problem as well, using spark 3.1.2 and Datastax cassandra 
connector 3.1.0.

While loading some tables from cassandra, doing simple preparation steps for 
some of the dataframes, and finally union-ing the results, the job hangs 
indefinitely at the union.

As [~rrusso2007] helpfully suggested, setting
{code:java}
 --conf spark.driver.host=127.0.0.1 {code}
*fixes* the problem entirely.

 

But understanding what is happening here would really ease my mind.

I haven't yet been able to extract a minimal example to reproduce the problem 
(if possible I will add one later).

This is basically what happens in the original code:

 
{code:java}
val a = sparkContext.cassandraTable(...).filter(...).select(...)
val b = sparkContext.cassandraTable(...).filter(...).select(...)
val c = sparkContext.cassandraTable(...)
 .join(sparkContext.cassandraTable(...))
 .filter(...)
 .groupBy(...)
 .agg(sum(...))
 .select(...)
val d = sparkContext.cassandraTable(...)
 .join(sparkContext.cassandraTable(...))
 .filter(...)
 .select(...)
val e = sparkContext.cassandraTable(...)

val rows =
 Seq(a, b, c, d)
 .reduce(_ unionByName _)
 .join(sparkContext.cassandraTable(...), ...)
 .unionByName(e.drop(...))
{code}
 

As you can see, a, b, c, d are DataFrames that require a bit of simple 
preparation.

e on the other hand is simply read as-is from a Cassandra table, however this 
dataframe seems to pose the problem - *removing* the last line of code (the one 
involving e) removes the hanging.
 Also *repartitioning* e removes the hanging.

 

 


was (Author: melitta):
I ran into this problem as well, using spark 3.1.2 and Datastax cassandra 
connector 3.1.0.

While loading some tables from cassandra, doing simple preparation steps for 
some of the dataframes, and finally union-ing the results, the job hangs 
indefinitely at the union.


As [~rrusso2007] helpfully suggested, setting 
{code:java}
 --conf spark.driver.host=127.0.0.1 {code}
*fixes* the problem entirely.

 

But understanding what is happening here would really ease my mind.

I haven't yet been able to extract a minimal example to reproduce the problem 
(if possible I will add one later).

This is basically what happens in the original code:

 
{code:java}
val a = sparkContext.cassandraTable(...).filter(...).select(...)
val b = sparkContext.cassandraTable(...).filter(...).select(...)
val c = sparkContext.cassandraTable(...)
 .join(sparkContext.cassandraTable(...))
 .filter(...)
 .groupBy(...)
 .agg(sum(...))
 .select(...)
val d = sparkContext.cassandraTable(...)
 .join(sparkContext.cassandraTable(...))
 .filter(...)
 .select(...)
val e = sparkContext.cassandraTable(...)
val rows =
 Seq(a, b, c, d)
 .reduce(_ unionByName _)
 .join(sparkContext.cassandraTable(...), ...)
 .unionByName(e.drop(...))
{code}
 

As you can see, a, b, c, d are DataFrames that require a bit of simple 
preparation.

e on the other hand is simply read as-is from a Cassandra table, however this 
dataframe seems to pose the problem - *removing* the last line of code (the one 
involving e) removes the hanging.
Also *repartitioning* e removes the hanging.

 

 

> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
>                 Key: SPARK-16087
>                 URL: https://issues.apache.org/jira/browse/SPARK-16087
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.4.1, 1.6.1, 2.0.1, 3.0.1
>            Reporter: Kevin Conaway
>            Priority: Critical
>              Labels: bulk-closed
>         Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-00000, part-00001, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
>     public static void main(String [] args) throws Exception {
>         JavaSparkContext sc = new JavaSparkContext(
>             new SparkConf()
>                 .set("spark.serializer", KryoSerializer.class.getName())
>                 .set("spark.master", "local[*]")
>                 .setAppName(SparkBug.class.getName())
>         );
>         JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
>            "hdfs://localhost:9000/part-00000",
>             LongWritable.class,
>             BytesWritable.class
>         ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, 
> LongWritable, BytesWritable>() {
>             @Override
>             public Tuple2<LongWritable, BytesWritable> 
> call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception {
>                 return new Tuple2<>(
>                     new LongWritable(tuple._1.get()),
>                     new BytesWritable(tuple._2.copyBytes())
>                 );
>             }
>         }).persist(
>             StorageLevel.MEMORY_ONLY()
>         );
>         System.out.println("Before union: " + rdd1.count());
>         JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
>             "hdfs://localhost:9000/part-00001",
>             LongWritable.class,
>             BytesWritable.class
>         );
>         JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
>         System.out.println("After union: " + joined.count());
>     }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to