Re: How to return a pair RDD from an RDD that has foreachPartition applied?

2015-11-18 Thread swetha kasireddy
Looks like I can use mapPartitions but can it be done using
forEachPartition?

On Tue, Nov 17, 2015 at 11:51 PM, swetha  wrote:

> Hi,
>
> How to return an RDD of key/value pairs from an RDD that has
> foreachPartition applied. I have my code something like the following. It
> looks like an RDD that has foreachPartition can have only the return type
> as
> Unit. How do I apply foreachPartition and do a save and at the same return
> a
> pair RDD.
>
>  def saveDataPointsBatchNew(records: RDD[(String, (Long,
> java.util.LinkedHashMap[java.lang.Long,
> java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
> java.util.HashSet[java.lang.String] , Boolean))])= {
> records.foreachPartition({ partitionOfRecords =>
>   val dataLoader = new DataLoaderImpl();
>   var metricList = new java.util.ArrayList[String]();
>   var storageTimeStamp = 0l
>
>   if (partitionOfRecords != null) {
> partitionOfRecords.foreach(record => {
>
> if (record._2._1 == 0l) {
> entrySet = record._2._3.entrySet()
> itr = entrySet.iterator();
> while (itr.hasNext()) {
> val entry = itr.next();
> storageTimeStamp = entry.getKey.toLong
> val dayCounts = entry.getValue
> metricsDayCounts += record._1 ->(storageTimeStamp,
> dayCounts.toFloat)
> }
> }
>}
> }
> )
>   }
>
>   //Code to insert the last successful batch/streaming timestamp  ends
>   dataLoader.saveDataPoints(metricList);
>   metricList = null
>
> })
>   }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to return a pair RDD from an RDD that has foreachPartition applied?

2015-11-18 Thread Sathish Kumaran Vairavelu
I think you can use mapPartitions that returns PairRDDs followed by
forEachPartition for saving it

On Wed, Nov 18, 2015 at 9:31 AM swetha kasireddy 
wrote:

> Looks like I can use mapPartitions but can it be done using
> forEachPartition?
>
> On Tue, Nov 17, 2015 at 11:51 PM, swetha 
> wrote:
>
>> Hi,
>>
>> How to return an RDD of key/value pairs from an RDD that has
>> foreachPartition applied. I have my code something like the following. It
>> looks like an RDD that has foreachPartition can have only the return type
>> as
>> Unit. How do I apply foreachPartition and do a save and at the same
>> return a
>> pair RDD.
>>
>>  def saveDataPointsBatchNew(records: RDD[(String, (Long,
>> java.util.LinkedHashMap[java.lang.Long,
>> java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
>> java.util.HashSet[java.lang.String] , Boolean))])= {
>> records.foreachPartition({ partitionOfRecords =>
>>   val dataLoader = new DataLoaderImpl();
>>   var metricList = new java.util.ArrayList[String]();
>>   var storageTimeStamp = 0l
>>
>>   if (partitionOfRecords != null) {
>> partitionOfRecords.foreach(record => {
>>
>> if (record._2._1 == 0l) {
>> entrySet = record._2._3.entrySet()
>> itr = entrySet.iterator();
>> while (itr.hasNext()) {
>> val entry = itr.next();
>> storageTimeStamp = entry.getKey.toLong
>> val dayCounts = entry.getValue
>> metricsDayCounts += record._1 ->(storageTimeStamp,
>> dayCounts.toFloat)
>> }
>> }
>>}
>> }
>> )
>>   }
>>
>>   //Code to insert the last successful batch/streaming timestamp  ends
>>   dataLoader.saveDataPoints(metricList);
>>   metricList = null
>>
>> })
>>   }
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How to return a pair RDD from an RDD that has foreachPartition applied?

2015-11-17 Thread swetha
Hi,

How to return an RDD of key/value pairs from an RDD that has
foreachPartition applied. I have my code something like the following. It
looks like an RDD that has foreachPartition can have only the return type as
Unit. How do I apply foreachPartition and do a save and at the same return a
pair RDD.

 def saveDataPointsBatchNew(records: RDD[(String, (Long,
java.util.LinkedHashMap[java.lang.Long,
java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
java.util.HashSet[java.lang.String] , Boolean))])= {
records.foreachPartition({ partitionOfRecords =>
  val dataLoader = new DataLoaderImpl();
  var metricList = new java.util.ArrayList[String]();
  var storageTimeStamp = 0l

  if (partitionOfRecords != null) {
partitionOfRecords.foreach(record => {

if (record._2._1 == 0l) {
entrySet = record._2._3.entrySet()
itr = entrySet.iterator();
while (itr.hasNext()) {
val entry = itr.next();
storageTimeStamp = entry.getKey.toLong
val dayCounts = entry.getValue
metricsDayCounts += record._1 ->(storageTimeStamp,
dayCounts.toFloat)
}
}
   }
}
)
  }

  //Code to insert the last successful batch/streaming timestamp  ends
  dataLoader.saveDataPoints(metricList);
  metricList = null

})
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org