Hi,

How to return an RDD of key/value pairs from an RDD that has
foreachPartition applied. I have my code something like the following. It
looks like an RDD that has foreachPartition can have only the return type as
Unit. How do I apply foreachPartition and do a save and at the same return a
pair RDD.

 def saveDataPointsBatchNew(records: RDD[(String, (Long,
java.util.LinkedHashMap[java.lang.Long,
java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
java.util.HashSet[java.lang.String] , Boolean))])= {
    records.foreachPartition({ partitionOfRecords =>
      val dataLoader = new DataLoaderImpl();
      var metricList = new java.util.ArrayList[String]();
      var storageTimeStamp = 0l

      if (partitionOfRecords != null) {
        partitionOfRecords.foreach(record => {

            if (record._2._1 == 0l) {
                entrySet = record._2._3.entrySet()
                itr = entrySet.iterator();
                while (itr.hasNext()) {
                    val entry = itr.next();
                    storageTimeStamp = entry.getKey.toLong
                    val dayCounts = entry.getValue
                    metricsDayCounts += record._1 ->(storageTimeStamp,
dayCounts.toFloat)
                }
            }
           }
        }
        )
      }

      //Code to insert the last successful batch/streaming timestamp  ends
      dataLoader.saveDataPoints(metricList);
      metricList = null

    })
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to