OK, I think I've figured it out.

It seems to be a bug which has been reported at: 
https://issues.apache.org/jira/browse/SPARK-2823 and  
https://github.com/apache/spark/pull/1763.


As it says: "If the users set “spark.default.parallelism” and the value is 
different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions"


So my quick fix is to repartition the EdgeRDD to exactly the number of 
parallelism. But I think this would lead to much network communication.


So is there any other better solutions?


Thanks a lot!


Best,
Bin

在 2014-08-06 04:54:39,"Bin" <wubin_phi...@126.com> 写道:

Hi All,


Finally I found that the problem occured when I called the graphx lib:


"
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs 
with unequal numbers of partitions
        at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:56)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:703)
        at adsorption$.adsorption(adsorption.scala:138)
        at adsorption$.main(adsorption.scala:64)
        at adsorption.main(adsorption.scala)
"


The source codes:


"
129         val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, 
Int, String), Double] = graph
130         .outerJoinVertices(graph.inDegrees){
131             case (vid, u, inDegOpt) => (u._1, u._2, 1.0, 
inDegOpt.getOrElse(0), u._3)
132         }
133         //.mapVertices((vid, v_attr) => (v_attr._1, v_attr._2, 1.0, 0))
134         .cache()
135 
136         println("After transforming into adsorption graph 
****************************************************")
137 
138         adsorptionGraph.triplets.foreach(tri=>println())
"


Any advice?


Thanks a lot!


Best,
Bin


Reply via email to