[ https://issues.apache.org/jira/browse/SPARK-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14790681#comment-14790681 ]
Glenn Strycker commented on SPARK-10636: ---------------------------------------- I didn't "forget", I believed that "RDD = if {} else {} . something" would automatically take care of the associative property, and that anything after the final else {} would apply to both blocks. I didn't realize that braces behave similarly to parentheses and that I needed extras -- makes sense. I have now added these to my code. This wasn't a question for "user@ first", since I really did believe there was a bug. Jira is the place for submitting bug reports, even when the resolution is user error. > RDD filter does not work after if..then..else RDD blocks > -------------------------------------------------------- > > Key: SPARK-10636 > URL: https://issues.apache.org/jira/browse/SPARK-10636 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Glenn Strycker > > I have an RDD declaration of the following form: > {code} > val myRDD = if (some condition) { tempRDD1.some operations } else { > tempRDD2.some operations}.filter(a => a._2._5 <= 50) > {code} > When I output the contents of myRDD, I found entries that clearly had a._2._5 > > 50... the filter didn't work! > If I move the filter inside of the if..then blocks, it suddenly does work: > {code} > val myRDD = if (some condition) { tempRDD1.some operations.filter(a => > a._2._5 <= 50) } else { tempRDD2.some operations.filter(a => a._2._5 <= 50) } > {code} > I ran toDebugString after both of these code examples, and "filter" does > appear in the DAG for the second example, but DOES NOT appear in the first > DAG. Why not? > Am I misusing the if..then..else syntax for Spark/Scala? > Here is my actual code... ignore the crazy naming conventions and what it's > doing... > {code} > // this does NOT work > val myRDD = if (tempRDD2.count() > 0) { > tempRDD1. > map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))). > leftOuterJoin(tempRDD2). > map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, > a._2._2.getOrElse(1L)))). > leftOuterJoin(tempRDD2). > map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, > a._2._2.getOrElse(1L)))). > map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), > (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if > (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4))) > } else { > tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L))) > }. > filter(a => a._2._5 <= 50). > partitionBy(partitioner). > setName("myRDD"). > persist(StorageLevel.MEMORY_AND_DISK_SER) > myRDD.checkpoint() > println(myRDD.toDebugString) > // (4) MapPartitionsRDD[58] at map at myProgram.scala:2120 [] > // | MapPartitionsRDD[57] at map at myProgram.scala:2119 [] > // | MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 [] > // | MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 [] > // | CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 [] > // +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 [] > // | | MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 [] > // | | MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 [] > // | | CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 [] > // | +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 [] > // | | | clusterGraphWithComponentsRDD MapPartitionsRDD[28] at > reduceByKey at myProgram.scala:1689 [] > // | | | CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 > B; DiskSize: 0.0 B > // | | | CheckpointRDD[29] at count at myProgram.scala:1701 [] > // | +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at > myProgram.scala:383 [] > // | | CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 > B; DiskSize: 0.0 B > // | | CheckpointRDD[17] at count at myProgram.scala:394 [] > // +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at > myProgram.scala:383 [] > // | CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; > DiskSize: 0.0 B > // | CheckpointRDD[17] at count at myProgram.scala:394 [] > // this DOES work! > val myRDD = if (tempRDD2.count() > 0) { > tempRDD1. > map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))). > leftOuterJoin(tempRDD2). > map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, > a._2._2.getOrElse(1L)))). > leftOuterJoin(tempRDD2). > map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, > a._2._2.getOrElse(1L)))). > map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), > (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if > (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4))). > filter(a => a._2._5 <= 50) > } else { > tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L))). > filter(a => a._2._5 <= 50) > }. > partitionBy(partitioner). > setName("myRDD"). > persist(StorageLevel.MEMORY_AND_DISK_SER) > myRDD.checkpoint() > println(myRDD.toDebugString) > // (4) MapPartitionsRDD[59] at filter at myProgram.scala:2121 [] > // | MapPartitionsRDD[58] at map at myProgram.scala:2120 [] > // | MapPartitionsRDD[57] at map at myProgram.scala:2119 [] > // | MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 [] > // | MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 [] > // | CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 [] > // +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 [] > // | | MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 [] > // | | MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 [] > // | | CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 [] > // | +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 [] > // | | | clusterGraphWithComponentsRDD MapPartitionsRDD[28] at > reduceByKey at myProgram.scala:1689 [] > // | | | CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 > B; DiskSize: 0.0 B > // | | | CheckpointRDD[29] at count at myProgram.scala:1701 [] > // | +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at > myProgram.scala:383 [] > // | | CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 > B; DiskSize: 0.0 B > // | | CheckpointRDD[17] at count at myProgram.scala:394 [] > // +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at > myProgram.scala:383 [] > // | CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; > DiskSize: 0.0 B > // | CheckpointRDD[17] at count at myProgram.scala:394 [] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org