foreachPartition is not a transformation; it is an action. If you want to transform an RDD using an iterator in each partition, then use mapPartitions.
On Tue, Feb 28, 2017 at 8:17 PM, jeremycod <zoran.jere...@gmail.com> wrote: > Hi, > > I'm trying to transform one RDD two times. I'm using foreachParition and > embedded I have two map transformations on it. First time, it works fine > and > I get results, but second time I call map on it, it behaves like RDD has no > elements. > This is my code: > > val credentialsIdsScala: Seq[java.lang.Long] = > credentialsIds.asScala.toSeq > println("ALL CREDENTIALS:" + credentialsIdsScala.mkString(",")) > > > val credentialsRDD: RDD[Long] = sc.parallelize(credentialsIdsScala.map > { > Long2long }) > val connector = CassandraConnector(sc.getConf) > credentialsRDD.foreachPartition { > credentials => { > val userCourseKMeansProfiles: Iterator[Iterable[Tuple5[Long, > String, > Long, Long, String]]] = credentials.map { credentialid => > println("RUNNING USER PROFILE CLUSTERING FOR CREDENTIAL:" + > credentialid) > val userCourseProfile: Iterable[Tuple5[Long, String, Long, Long, > String]] = runPeriodicalKMeansClustering(dbName, days, numClusters, > numFeatures, credentialid) > userCourseProfile > } > userCourseKMeansProfiles.foreach(userProfile => { > val query = "INSERT INTO " + dbName + "." + > TablesNames.PROFILE_USERQUARTILE_FEATURES_BYPROFILE + "(course, > profile,date, userid, sequence) VALUES (?, ?, ?,?,?) "; > connector.withSessionDo { > session => { > userProfile.foreach(record => { > println("USER PROFILE RECORD:" + record._1 + " " + > record._2 > + " " + record._3 + " " + record._4 + " " + record._5) > session.execute(query, > record._1.asInstanceOf[java.lang.Long], record._2.asInstanceOf[String], > record._3.asInstanceOf[java.lang.Long], > record._4.asInstanceOf[java.lang.Long], record._5.asInstanceOf[String]) > }) > } > } > }) > val secondMapping = credentials.map { > credentialid => > println("credential id:" + credentialid) > credentialid > } > secondMapping.foreach(cid=>println("credentialid:"+cid)) > println("Second mapping:" + secondMapping.length) > } > > Could someone explain me what is wrong with my code and how to fix it? > > Thanks, > Zoran > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Can-t-transform-RDD-for-the-second-time-tp28441.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >