substitute mapPartitions by distinct

2016-05-04 Thread Batselem
Hi, I am trying to remove duplicates from a set of RDD tuples in an iterative
algorithm. I have discovered that it is possible to substitute RDD
mapPartitions for RDD distinct. 
First I partitioned the RDD and distinct it locally using mapPartitions
transformation. I expect it will be much faster when it comes to iterative
algorithm I checked two results and they were equal.. But I am not sure it
works correctly. my concern is that it does not guarantee that it will
remove all duplicates accurately. because hash codes can collide in some
times. and if duplicates are in different
partitions, the following code doesn't work. so all the same duplicates
should be in the same partition. Any suggestion will be appreciated. 

Code: 
PairRDD = inputPairs.partitionBy(new HashPartitioner(slices)) 

val distinctCount = PairRDD.distinct().count() 

val mapPartitionCount = PairRDD.mapPartitions(iterator => { 
  iterator.toList.distinct.toIterator 
}, true).count() 

println("distinct : " + distinctCount) 

println("mapPartitionCount : " + mapPartitionCount)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/substitute-mapPartitions-by-distinct-tp26876.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GC problem while filtering

2014-12-16 Thread Batselem
Hi I am trying to filter a large table with 3 columns. My goal is to filter
this bigtable using multi clauses. I filtered bigtable 3 times but the first
filtering took about 50 seconds to complete whereas the second and third
filter transformation took about 5 seconds. I wonder if it is because of
lazy evaluation. But I already evaluated my rdd parsing it when I first read
the data using sc.textFile then counted it. I got the following result:

Running times: 
t1 = 50seconds 
t2 = 5seconds 
t3 = 4seconds 
***CODE***
val clause = List( 
  (http://www.w3.org/1999/02/22-rdf-syntax-ns#type,
www.ssu.ac.kr#GraduateStudent), 
  (www.ssu.ac.kr#memberOf, ?Z), 
  (www.ssu.ac.kr#undergraduateDegreeFrom, ?Y) 
) 

val bcastedSubj: Broadcast[String] = sc.broadcast(?X) 
val bcastedCls: Broadcast[List[(String, String)]] = sc.broadcast(clause) 
var n = clause.length 

val t0 = System.currentTimeMillis() 

val subgraph1 = bigtable.mapPartitions ( 
  iterator = { 
val bcls = bcastedCls.value 
val bsubj = bcastedSubj.value 
n = bcls.length 
for ((s, grp) - iterator; 
 if { 
   val flag = if (!bsubj.startsWith(?)  !bsubj.equals(s))
false 
   else { 
 var k = 0 

 val m = grp.length 
 var flag1 = true 

 while(k  n) { 
   var flag2 = false 
   var l = 0 
   while(l  m) { 
 if (grp(l)._1.equals(bcls(k)._1) 
grp(l)._2.equals(bcls(k)._2)) flag2 = true 
 else if (bcls(k)._1.startsWith(?) 
grp(l)._2.equals(bcls(k)._2)) flag2 = true 
 else if  (bcls(k)._2.startsWith(?) 
grp(l)._1.equals(bcls(k)._1)) flag2 = true 
 l += 1 
   } 
   if (!flag2) flag1 = false 

   k += 1 
 } 

 flag1 
   } 

   flag 
 } 
) yield (s, grp) 
  }, preservesPartitioning = true).cache() 
val num1 = subgraph1.count()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GC-problem-while-filtering-tp20705.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org