Specifying a custom Partitioner on RDD creation in Spark 2

2018-04-10 Thread Colin Williams
Hi,

I'm currently creating RDDs using a pattern like follows:

val rdd: RDD[String] = session.sparkContext.parallelize(longkeys).flatMap(
  key => {
logInfo(s"job at key: ${key}")
Source.fromBytes(S3Util.getBytes(S3Util.getClient(region,
S3Util.getCredentialsProvider("INSTANCE", "")), bucket, key))
  .getLines()
  }
  )

We've been using this pattern or similar to workaround issues
regarding S3 and our hadoop version. However, this same pattern could
might be applied to other types of data sources, which may not have a
connector.

This method has been working out fairly well, but I'd like more
control regarding how the data is partitioned from the start.



I tried to manually partition the data without a partitioner, but got
JVM exceptions regarding my Arrays being to large for the JVM.

val keyList = groupedKeys.keys.toList
val rdd: RDD[String] =
session.sparkContext.parallelize(keyList,keyList.length).flatMap {
  key =>
logInfo(s"job at day: ${key}")
val byteArrayBuffer = new ArrayBuffer[Byte]()
val objectKeyList: List[String] = groupedKeys(key)
objectKeyList.foreach(
  objectKey => {
logInfo(s"working on object: ${objectKey}")
byteArrayBuffer.appendAll(S3Util.getBytes(S3Util.getClient(region,
S3Util.getCredentialsProvider("INSTANCE", "")), bucket, objectKey))
  }
)
Source.fromBytes(byteArrayBuffer.toArray[Byte]).getLines()
}



Then I've defined a custom partitioner based on my source data:

  class dayPartitioner(keys: List[String]) extends Partitioner with Logger {

val keyMap: Map[String, List[String]] = keys.groupBy(_.substring(0, 10))
val partitions = keyMap.keySet.size
val partitionMap: Map[String, Int] = keyMap.keys.zipWithIndex.toMap

override def getPartition(key: Any): Int = {
  val keyString = key.asInstanceOf[String]
  val partitionKey = keyString.substring(0, 10)
  partitionMap(partitionKey)
}

override def numPartitions: Int = partitions
  }

}


I'd like to know do I have to create a custom RDD class to specify my
RDD and use it like in the pattern above?

If so I'd also like a reference regarding doing this, to hopefully
save me some headaches and gotchas from a naive approach. I've found
one such example https://stackoverflow.com/a/25204589 but it's from an
older version of Spark.

I'm hoping maybe there is something more recent and more in-depth. I
don't mind references to books or otherwise.


Best,

Colin Williams

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Custom Partitioner not picked

2016-03-06 Thread Prabhu Joseph
Hi All,

When i am submitting a spark job on YARN with Custom Partitioner, it is
not picked by Executors. Executors still using the default HashPartitioner.
I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala)
and Custom Partitioner. The completed executor logs shows HashPartitioner.

Below is the Spark application code with Custom Partitioner and the log
line which is added into HashPartitioner class of Partition.scala

 
log.info("HashPartitioner="+key+"---"+numPartitions+""+Utils.nonNegativeMod(key.hashCode,
numPartitions))

The Executor logs has

16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42
16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42


How to make sure, the executors are picking the right partitioner.



*Code:*
package org.apache.spark

class ExactPartitioner(partitions: Int) extends Partitioner with Logging{

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {

*   log.info <http://log.info>("ExactPartitioner="+key)*

   key match{
   case "INFO" => 0
   case "DEBUG" => 1
   case "ERROR" => 2
   case "WARN" => 3
   case "FATAL" => 4
   }
  }
}

object GroupByCLDB {

def main(args: Array[String]) {

val logFile = "/DATA"

val sparkConf = new SparkConf().setAppName("GroupBy")
sparkConf.set("spark.executor.memory","4g");
sparkConf.set("spark.executor.cores","2");
sparkConf.set("spark.executor.instances","2");

val sc = new SparkContext(sparkConf)
val logData = sc.textFile(logFile)


case class LogClass(one:String,two:String)

def parse(line: String) = {
  val pieces = line.split(' ')
  val level = pieces(2).toString
  val one = pieces(0).toString
  val two = pieces(1).toString
  (level,LogClass(one,two))
  }

val output = logData.map(x => parse(x))

*val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val
groups = partitioned.groupByKey(new ExactPartitioner(5))*
groups.count()

output.partitions.size
partitioned.partitions.size

}
}



Thanks,
Prabhu Joseph


Re: map operation clears custom partitioner

2016-02-22 Thread Silvio Fiorito
You can use mapValues to ensure partitioning is not lost.

From: Brian London <brianmlon...@gmail.com<mailto:brianmlon...@gmail.com>>
Date: Monday, February 22, 2016 at 1:21 PM
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: map operation clears custom partitioner

It appears that when a custom partitioner is applied in a groupBy operation, it 
is not propagated through subsequent non-shuffle operations.  Is this 
intentional? Is there any way to carry custom partitioning through maps?

I've uploaded a gist that exhibits the behavior. 
https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6


Re: map operation clears custom partitioner

2016-02-22 Thread Sean Owen
The problem is that your new mapped values may be in the wrong
partition, according to your partitioner. Look for methods that have a
preservesPartitioning flag, which is a way to indicate that you know
the partitioning remains correct. (Like, you partition by keys and
didn't change the keys in mapping)

On Mon, Feb 22, 2016 at 6:21 PM, Brian London <brianmlon...@gmail.com> wrote:
> It appears that when a custom partitioner is applied in a groupBy operation,
> it is not propagated through subsequent non-shuffle operations.  Is this
> intentional? Is there any way to carry custom partitioning through maps?
>
> I've uploaded a gist that exhibits the behavior.
> https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



map operation clears custom partitioner

2016-02-22 Thread Brian London
It appears that when a custom partitioner is applied in a groupBy
operation, it is not propagated through subsequent non-shuffle operations.
Is this intentional? Is there any way to carry custom partitioning through
maps?

I've uploaded a gist that exhibits the behavior.
https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-18 Thread Koert Kuipers
although it is not a bad idea to write data out partitioned, and then use a
merge join when reading it back in, this currently isn't even easily doable
with rdds because when you read an rdd from disk the partitioning info is
lost. re-introducing a partitioner at that point causes a shuffle defeating
the purpose.

On Thu, Feb 18, 2016 at 1:49 PM, Rishi Mishra <rmis...@snappydata.io> wrote:

> Michael,
> Is there any specific reason why DataFrames does not have partitioners
> like RDDs ? This will be very useful if one is writing custom datasources ,
> which keeps data in partitions. While storing data one can pre-partition
> the data at Spark level rather than at the datasource.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So suppose I have a bunch of userIds and I need to save them as parquet
>> in database. I also need to load them back and need to be able to do a join
>> on userId. My idea is to partition by userId hashcode first and then on
>> userId. So that I don't have to deal with any performance issues because of
>> a number of small files and also to be able to scan faster.
>>
>>
>> Something like ...df.write.format("parquet").partitionBy( "userIdHash"
>> , "userId").mode(SaveMode.Append).save("userRecords");
>>
>> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> So suppose I have a bunch of userIds and I need to save them as parquet
>>> in database. I also need to load them back and need to be able to do a join
>>> on userId. My idea is to partition by userId hashcode first and then on
>>> userId.
>>>
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Can you describe what you are trying to accomplish?  What would the
>>>> custom partitioner be?
>>>>
>>>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> How do I use a custom partitioner when I do a saveAsTable in a
>>>>> dataframe.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Swetha
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-18 Thread Rishi Mishra
Michael,
Is there any specific reason why DataFrames does not have partitioners like
RDDs ? This will be very useful if one is writing custom datasources ,
which keeps data in partitions. While storing data one can pre-partition
the data at Spark level rather than at the datasource.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> So suppose I have a bunch of userIds and I need to save them as parquet in
> database. I also need to load them back and need to be able to do a join
> on userId. My idea is to partition by userId hashcode first and then on
> userId. So that I don't have to deal with any performance issues because of
> a number of small files and also to be able to scan faster.
>
>
> Something like ...df.write.format("parquet").partitionBy( "userIdHash"
> , "userId").mode(SaveMode.Append).save("userRecords");
>
> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So suppose I have a bunch of userIds and I need to save them as parquet
>> in database. I also need to load them back and need to be able to do a join
>> on userId. My idea is to partition by userId hashcode first and then on
>> userId.
>>
>>
>>
>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Can you describe what you are trying to accomplish?  What would the
>>> custom partitioner be?
>>>
>>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> How do I use a custom partitioner when I do a saveAsTable in a
>>>> dataframe.
>>>>
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread swetha kasireddy
So suppose I have a bunch of userIds and I need to save them as parquet in
database. I also need to load them back and need to be able to do a join
on userId. My idea is to partition by userId hashcode first and then on
userId. So that I don't have to deal with any performance issues because of
a number of small files and also to be able to scan faster.


Something like ...df.write.format("parquet").partitionBy( "userIdHash"
, "userId").mode(SaveMode.Append).save("userRecords");

On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> So suppose I have a bunch of userIds and I need to save them as parquet in
> database. I also need to load them back and need to be able to do a join
> on userId. My idea is to partition by userId hashcode first and then on
> userId.
>
>
>
> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Can you describe what you are trying to accomplish?  What would the
>> custom partitioner be?
>>
>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread swetha kasireddy
So suppose I have a bunch of userIds and I need to save them as parquet in
database. I also need to load them back and need to be able to do a join
on userId. My idea is to partition by userId hashcode first and then on
userId.



On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Can you describe what you are trying to accomplish?  What would the custom
> partitioner be?
>
> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread Rishi Mishra
Unfortunately there is not any,  at least till 1.5.  Have not gone through
the new DataSet of 1.6.  There is some basic support for Parquet like
partitionByColumn.
If you want to partition your dataset on a certain way you have to use an
RDD to partition & convert that into a DataFrame before storing in table.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, Feb 16, 2016 at 11:51 PM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How to use a custom partitioner in a dataframe in Spark

2016-02-16 Thread SRK
Hi,

How do I use a custom partitioner when I do a saveAsTable in a dataframe. 


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: python rdd.partionBy(): any examples of a custom partitioner?

2015-12-07 Thread Fengdong Yu
refer here: 
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html


of section:
Example 4-27. Python custom partitioner




> On Dec 8, 2015, at 10:07 AM, Keith Freeman <8fo...@gmail.com> wrote:
> 
> I'm not a python expert, so I'm wondering if anybody has a working example of 
> a partitioner for the "partitionFunc" argument (default "portable_hash") to 
> rdd.partitionBy()?
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



python rdd.partionBy(): any examples of a custom partitioner?

2015-12-07 Thread Keith Freeman
I'm not a python expert, so I'm wondering if anybody has a working 
example of a partitioner for the "partitionFunc" argument (default 
"portable_hash") to rdd.partitionBy()?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha
Hi,

We currently use reduceByKey to reduce by a particular metric name in our
Streaming/Batch job. It seems to be doing a lot of shuffles and it has
impact on performance. Does using a custompartitioner before calling
reduceByKey improve performance?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
If you just want to control the number of reducers, then setting the
numPartitions is sufficient. If you want to control how exact partitioning
scheme (that is some other scheme other than hash-based) then you need to
implement a custom partitioner. It can be used to improve data skews, etc.
which ultimately improves performance.

On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote:

> Hi,
>
> We currently use reduceByKey to reduce by a particular metric name in our
> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
> impact on performance. Does using a custompartitioner before calling
> reduceByKey improve performance?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha kasireddy
So, Wouldn't  using a customPartitioner on the rdd upon which  the
groupByKey  or reduceByKey is performed avoid shuffles and improve
performance? My code does groupByAndSort and reduceByKey on different
datasets as shown below. Would using a custom partitioner on those datasets
before using a  groupByKey or reduceByKey improve performance? My idea is
to avoid shuffles and improve performance. Also, right now I see a lot of
spills when there is a very large dataset for groupByKey and reduceByKey. I
think the memory is not sufficient. We need to group by sessionId and then
sort the Jsons based on the timeStamp as shown in the below code.


What is the alternative to using groupByKey for better performance? And in
case of reduceByKey, would using a customPartitioner on the RDD upon which
the reduceByKey is performed would reduce the shuffles and improve the
performance?


rdd.partitionBy(customPartitioner)

def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
List[(Long, String)])] =
{ val grpdRecs = rdd.groupByKey(); val srtdRecs =
grpdRecs.mapValues[(List[(Long, String)])](iter =>
iter.toList.sortBy(_._1)) srtdRecs }

rdd.reduceByKey((a, b) => {
  (Math.max(a._1, b._1), (a._2 ++ b._2))
})



On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> wrote:

> If you just want to control the number of reducers, then setting the
> numPartitions is sufficient. If you want to control how exact partitioning
> scheme (that is some other scheme other than hash-based) then you need to
> implement a custom partitioner. It can be used to improve data skews, etc.
> which ultimately improves performance.
>
> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> We currently use reduceByKey to reduce by a particular metric name in our
>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>> impact on performance. Does using a custompartitioner before calling
>> reduceByKey improve performance?
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
if you specify the same partitioner (custom or otherwise) for both
partitionBy and groupBy, then may be it will help. The fundamental problem
is groupByKey, that takes a lot of working memory.
1. Try to avoid groupByKey. What is it that you want to after sorting the
list of grouped events? can you do that operation with a reduceByKey?
2. If not, use more partitions. That would cause lesser data in each
partition, so less spilling.
3. You can control the amount memory allocated for shuffles by changing the
configuration spark.shuffle.memoryFraction . More fraction would cause less
spilling.


On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> So, Wouldn't  using a customPartitioner on the rdd upon which  the
> groupByKey  or reduceByKey is performed avoid shuffles and improve
> performance? My code does groupByAndSort and reduceByKey on different
> datasets as shown below. Would using a custom partitioner on those datasets
> before using a  groupByKey or reduceByKey improve performance? My idea is
> to avoid shuffles and improve performance. Also, right now I see a lot of
> spills when there is a very large dataset for groupByKey and reduceByKey. I
> think the memory is not sufficient. We need to group by sessionId and then
> sort the Jsons based on the timeStamp as shown in the below code.
>
>
> What is the alternative to using groupByKey for better performance? And in
> case of reduceByKey, would using a customPartitioner on the RDD upon which
> the reduceByKey is performed would reduce the shuffles and improve the
> performance?
>
>
> rdd.partitionBy(customPartitioner)
>
> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
> List[(Long, String)])] =
> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
> grpdRecs.mapValues[(List[(Long, String)])](iter =>
> iter.toList.sortBy(_._1)) srtdRecs }
>
> rdd.reduceByKey((a, b) => {
>   (Math.max(a._1, b._1), (a._2 ++ b._2))
> })
>
>
>
> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> If you just want to control the number of reducers, then setting the
>> numPartitions is sufficient. If you want to control how exact partitioning
>> scheme (that is some other scheme other than hash-based) then you need to
>> implement a custom partitioner. It can be used to improve data skews, etc.
>> which ultimately improves performance.
>>
>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We currently use reduceByKey to reduce by a particular metric name in our
>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>>> impact on performance. Does using a custompartitioner before calling
>>> reduceByKey improve performance?
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
If it is streaming, you can look at updateStateByKey for maintaining active
sessions. But wont work for batch.

and I answered that before. it can improve performance if you change the
partitioning scheme from hash-based to something else. Its hard to say
anything beyond that without understand the data skew and other details of
your application. Before jumping into that, you should simple change the
number of partitions and see if the performance improves.



On Tue, Oct 27, 2015 at 7:10 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> After sorting the list of grouped events I would need to have an RDD that
> has a key which is nothing but the  sessionId and a list of values that are
> sorted by timeStamp for each input Json. So basically the return type would
> be RDD[(String, List[(Long, String)]  where the key is the sessionId and
>  a list of tuples that has a timeStamp and Json as the values. I will need
> to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp
> and then get the list of JsonValues in a sorted order. Is there any
> alternative for that? Please find the code below that I used for the same.
>
>
> Also, does using a customPartitioner for a reduceByKey improve performance?
>
>
> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
> List[(Long, String)])] =
> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
> grpdRecs.mapValues[(List[(Long, String)])](iter =>
> iter.toList.sortBy(_._1)) srtdRecs }
>
>
> On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> if you specify the same partitioner (custom or otherwise) for both
>> partitionBy and groupBy, then may be it will help. The fundamental problem
>> is groupByKey, that takes a lot of working memory.
>> 1. Try to avoid groupByKey. What is it that you want to after sorting the
>> list of grouped events? can you do that operation with a reduceByKey?
>> 2. If not, use more partitions. That would cause lesser data in each
>> partition, so less spilling.
>> 3. You can control the amount memory allocated for shuffles by changing
>> the configuration spark.shuffle.memoryFraction . More fraction would cause
>> less spilling.
>>
>>
>> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> So, Wouldn't  using a customPartitioner on the rdd upon which  the
>>> groupByKey  or reduceByKey is performed avoid shuffles and improve
>>> performance? My code does groupByAndSort and reduceByKey on different
>>> datasets as shown below. Would using a custom partitioner on those datasets
>>> before using a  groupByKey or reduceByKey improve performance? My idea is
>>> to avoid shuffles and improve performance. Also, right now I see a lot of
>>> spills when there is a very large dataset for groupByKey and reduceByKey. I
>>> think the memory is not sufficient. We need to group by sessionId and then
>>> sort the Jsons based on the timeStamp as shown in the below code.
>>>
>>>
>>> What is the alternative to using groupByKey for better performance? And
>>> in case of reduceByKey, would using a customPartitioner on the RDD upon
>>> which the reduceByKey is performed would reduce the shuffles and improve
>>> the performance?
>>>
>>>
>>> rdd.partitionBy(customPartitioner)
>>>
>>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]):
>>> RDD[(String, List[(Long, String)])] =
>>> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
>>> grpdRecs.mapValues[(List[(Long, String)])](iter =>
>>> iter.toList.sortBy(_._1)) srtdRecs }
>>>
>>> rdd.reduceByKey((a, b) => {
>>>   (Math.max(a._1, b._1), (a._2 ++ b._2))
>>> })
>>>
>>>
>>>
>>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> If you just want to control the number of reducers, then setting the
>>>> numPartitions is sufficient. If you want to control how exact partitioning
>>>> scheme (that is some other scheme other than hash-based) then you need to
>>>> implement a custom partitioner. It can be used to improve data skews, etc.
>>>> which ultimately improves performance.
>>>>
>>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We currently use reduceByKey to reduce by a particular metric name in
>>>>> our
>>>>> Streamin

Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha kasireddy
After sorting the list of grouped events I would need to have an RDD that
has a key which is nothing but the  sessionId and a list of values that are
sorted by timeStamp for each input Json. So basically the return type would
be RDD[(String, List[(Long, String)]  where the key is the sessionId and  a
list of tuples that has a timeStamp and Json as the values. I will need to
use groupByKey to do a groupBy sessionId and secondary sort by timeStamp
and then get the list of JsonValues in a sorted order. Is there any
alternative for that? Please find the code below that I used for the same.


Also, does using a customPartitioner for a reduceByKey improve performance?


def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
List[(Long, String)])] =
{ val grpdRecs = rdd.groupByKey(); val srtdRecs =
grpdRecs.mapValues[(List[(Long, String)])](iter =>
iter.toList.sortBy(_._1)) srtdRecs }


On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> wrote:

> if you specify the same partitioner (custom or otherwise) for both
> partitionBy and groupBy, then may be it will help. The fundamental problem
> is groupByKey, that takes a lot of working memory.
> 1. Try to avoid groupByKey. What is it that you want to after sorting the
> list of grouped events? can you do that operation with a reduceByKey?
> 2. If not, use more partitions. That would cause lesser data in each
> partition, so less spilling.
> 3. You can control the amount memory allocated for shuffles by changing
> the configuration spark.shuffle.memoryFraction . More fraction would cause
> less spilling.
>
>
> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So, Wouldn't  using a customPartitioner on the rdd upon which  the
>> groupByKey  or reduceByKey is performed avoid shuffles and improve
>> performance? My code does groupByAndSort and reduceByKey on different
>> datasets as shown below. Would using a custom partitioner on those datasets
>> before using a  groupByKey or reduceByKey improve performance? My idea is
>> to avoid shuffles and improve performance. Also, right now I see a lot of
>> spills when there is a very large dataset for groupByKey and reduceByKey. I
>> think the memory is not sufficient. We need to group by sessionId and then
>> sort the Jsons based on the timeStamp as shown in the below code.
>>
>>
>> What is the alternative to using groupByKey for better performance? And
>> in case of reduceByKey, would using a customPartitioner on the RDD upon
>> which the reduceByKey is performed would reduce the shuffles and improve
>> the performance?
>>
>>
>> rdd.partitionBy(customPartitioner)
>>
>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
>> List[(Long, String)])] =
>> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
>> grpdRecs.mapValues[(List[(Long, String)])](iter =>
>> iter.toList.sortBy(_._1)) srtdRecs }
>>
>> rdd.reduceByKey((a, b) => {
>>   (Math.max(a._1, b._1), (a._2 ++ b._2))
>> })
>>
>>
>>
>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> If you just want to control the number of reducers, then setting the
>>> numPartitions is sufficient. If you want to control how exact partitioning
>>> scheme (that is some other scheme other than hash-based) then you need to
>>> implement a custom partitioner. It can be used to improve data skews, etc.
>>> which ultimately improves performance.
>>>
>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We currently use reduceByKey to reduce by a particular metric name in
>>>> our
>>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>>>> impact on performance. Does using a custompartitioner before calling
>>>> reduceByKey improve performance?
>>>>
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Custom Partitioner

2015-09-02 Thread Jem Tucker
alter the range partitioner such that it skews the partitioning and assigns
more partitions to the heavier weighted keys? to do this you will have to
know the weighting before you start

On Wed, Sep 2, 2015 at 8:02 AM shahid ashraf <sha...@trialx.com> wrote:

> yes i can take as an example , but my actual use case is that in need to
> resolve a data skew, when i do grouping based on key(A-Z) the resulting
> partitions are skewed like
> (partition no.,no_of_keys, total elements with given key)
> << partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13,
> 18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and
> elements: >>
> the data has been skewed to partition 1 and 4, i need to split the
> partition. and do processing on split partitions and i should be able to
> combine splitted partition back also.
>
> On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu <dav...@databricks.com> wrote:
>
>> You can take the sortByKey as example:
>> https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642
>>
>> On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>> > something like...
>> >
>> > class RangePartitioner(Partitioner):
>> > def __init__(self, numParts):
>> > self.numPartitions = numParts
>> > self.partitionFunction = rangePartition
>> > def rangePartition(key):
>> > # Logic to turn key into a partition id
>> > return id
>> >
>> > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com>
>> wrote:
>> >>
>> >> Hi
>> >>
>> >> I think range partitioner is not available in pyspark, so if we want
>> >> create one. how should we create that. my question is that.
>> >>
>> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com>
>> wrote:
>> >>>
>> >>> Ah sorry I miss read your question. In pyspark it looks like you just
>> >>> need to instantiate the Partitioner class with numPartitions and
>> >>> partitionFunc.
>> >>>
>> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com>
>> wrote:
>> >>>>
>> >>>> Hi
>> >>>>
>> >>>> I did not get this, e.g if i need to create a custom partitioner like
>> >>>> range partitioner.
>> >>>>
>> >>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> You just need to extend Partitioner and override the numPartitions
>> and
>> >>>>> getPartition methods, see below
>> >>>>>
>> >>>>> class MyPartitioner extends partitioner {
>> >>>>>   def numPartitions: Int = // Return the number of partitions
>> >>>>>   def getPartition(key Any): Int = // Return the partition for a
>> given
>> >>>>> key
>> >>>>> }
>> >>>>>
>> >>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <
>> shahidashr...@icloud.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Hi Sparkians
>> >>>>>>
>> >>>>>> How can we create a customer partition in pyspark
>> >>>>>>
>> >>>>>>
>> -
>> >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> with Regards
>> >>>> Shahid Ashraf
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> with Regards
>> >> Shahid Ashraf
>>
>
>
>
> --
> with Regards
> Shahid Ashraf
>


Re: Custom Partitioner

2015-09-02 Thread shahid ashraf
yes i can take as an example , but my actual use case is that in need to
resolve a data skew, when i do grouping based on key(A-Z) the resulting
partitions are skewed like
(partition no.,no_of_keys, total elements with given key)
<< partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13,
18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and
elements: >>
the data has been skewed to partition 1 and 4, i need to split the
partition. and do processing on split partitions and i should be able to
combine splitted partition back also.

On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu <dav...@databricks.com> wrote:

> You can take the sortByKey as example:
> https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642
>
> On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote:
> > something like...
> >
> > class RangePartitioner(Partitioner):
> > def __init__(self, numParts):
> > self.numPartitions = numParts
> > self.partitionFunction = rangePartition
> > def rangePartition(key):
> > # Logic to turn key into a partition id
> > return id
> >
> > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote:
> >>
> >> Hi
> >>
> >> I think range partitioner is not available in pyspark, so if we want
> >> create one. how should we create that. my question is that.
> >>
> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com>
> wrote:
> >>>
> >>> Ah sorry I miss read your question. In pyspark it looks like you just
> >>> need to instantiate the Partitioner class with numPartitions and
> >>> partitionFunc.
> >>>
> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com>
> wrote:
> >>>>
> >>>> Hi
> >>>>
> >>>> I did not get this, e.g if i need to create a custom partitioner like
> >>>> range partitioner.
> >>>>
> >>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> You just need to extend Partitioner and override the numPartitions
> and
> >>>>> getPartition methods, see below
> >>>>>
> >>>>> class MyPartitioner extends partitioner {
> >>>>>   def numPartitions: Int = // Return the number of partitions
> >>>>>   def getPartition(key Any): Int = // Return the partition for a
> given
> >>>>> key
> >>>>> }
> >>>>>
> >>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <
> shahidashr...@icloud.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Sparkians
> >>>>>>
> >>>>>> How can we create a customer partition in pyspark
> >>>>>>
> >>>>>>
> -
> >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>>>>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> with Regards
> >>>> Shahid Ashraf
> >>
> >>
> >>
> >>
> >> --
> >> with Regards
> >> Shahid Ashraf
>



-- 
with Regards
Shahid Ashraf


Re: Custom Partitioner

2015-09-01 Thread Davies Liu
You can take the sortByKey as example:
https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642

On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote:
> something like...
>
> class RangePartitioner(Partitioner):
> def __init__(self, numParts):
> self.numPartitions = numParts
> self.partitionFunction = rangePartition
> def rangePartition(key):
> # Logic to turn key into a partition id
> return id
>
> On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote:
>>
>> Hi
>>
>> I think range partitioner is not available in pyspark, so if we want
>> create one. how should we create that. my question is that.
>>
>> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>>>
>>> Ah sorry I miss read your question. In pyspark it looks like you just
>>> need to instantiate the Partitioner class with numPartitions and
>>> partitionFunc.
>>>
>>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote:
>>>>
>>>> Hi
>>>>
>>>> I did not get this, e.g if i need to create a custom partitioner like
>>>> range partitioner.
>>>>
>>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> You just need to extend Partitioner and override the numPartitions and
>>>>> getPartition methods, see below
>>>>>
>>>>> class MyPartitioner extends partitioner {
>>>>>   def numPartitions: Int = // Return the number of partitions
>>>>>   def getPartition(key Any): Int = // Return the partition for a given
>>>>> key
>>>>> }
>>>>>
>>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com>
>>>>> wrote:
>>>>>>
>>>>>> Hi Sparkians
>>>>>>
>>>>>> How can we create a customer partition in pyspark
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> with Regards
>>>> Shahid Ashraf
>>
>>
>>
>>
>> --
>> with Regards
>> Shahid Ashraf

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom Partitioner

2015-09-01 Thread Jem Tucker
Hi,

You just need to extend Partitioner and override the numPartitions and
getPartition methods, see below

class MyPartitioner extends partitioner {
  def numPartitions: Int = // Return the number of partitions
  def getPartition(key Any): Int = // Return the partition for a given key
}

On Tue, Sep 1, 2015 at 10:15 AM shahid qadri 
wrote:

> Hi Sparkians
>
> How can we create a customer partition in pyspark
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Custom Partitioner

2015-09-01 Thread Jem Tucker
Ah sorry I miss read your question. In pyspark it looks like you just need
to instantiate the Partitioner class with numPartitions and partitionFunc.

On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote:

> Hi
>
> I did not get this, e.g if i need to create a custom partitioner like
> range partitioner.
>
> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>
>> Hi,
>>
>> You just need to extend Partitioner and override the numPartitions and
>> getPartition methods, see below
>>
>> class MyPartitioner extends partitioner {
>>   def numPartitions: Int = // Return the number of partitions
>>   def getPartition(key Any): Int = // Return the partition for a given key
>> }
>>
>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com>
>> wrote:
>>
>>> Hi Sparkians
>>>
>>> How can we create a customer partition in pyspark
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
>
> --
> with Regards
> Shahid Ashraf
>


Custom Partitioner

2015-09-01 Thread shahid qadri
Hi Sparkians

How can we create a customer partition in pyspark

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom Partitioner

2015-09-01 Thread shahid ashraf
Hi

I did not get this, e.g if i need to create a custom partitioner like range
partitioner.

On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:

> Hi,
>
> You just need to extend Partitioner and override the numPartitions and
> getPartition methods, see below
>
> class MyPartitioner extends partitioner {
>   def numPartitions: Int = // Return the number of partitions
>   def getPartition(key Any): Int = // Return the partition for a given key
> }
>
> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com>
> wrote:
>
>> Hi Sparkians
>>
>> How can we create a customer partition in pyspark
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
with Regards
Shahid Ashraf


Re: Custom Partitioner

2015-09-01 Thread shahid ashraf
Hi

I think range partitioner is not available in pyspark, so if we want create
one. how should we create that. my question is that.

On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:

> Ah sorry I miss read your question. In pyspark it looks like you just need
> to instantiate the Partitioner class with numPartitions and partitionFunc.
>
> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote:
>
>> Hi
>>
>> I did not get this, e.g if i need to create a custom partitioner like
>> range partitioner.
>>
>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> You just need to extend Partitioner and override the numPartitions and
>>> getPartition methods, see below
>>>
>>> class MyPartitioner extends partitioner {
>>>   def numPartitions: Int = // Return the number of partitions
>>>   def getPartition(key Any): Int = // Return the partition for a given
>>> key
>>> }
>>>
>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com>
>>> wrote:
>>>
>>>> Hi Sparkians
>>>>
>>>> How can we create a customer partition in pyspark
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>
>>
>> --
>> with Regards
>> Shahid Ashraf
>>
>


-- 
with Regards
Shahid Ashraf


Re: Custom Partitioner

2015-09-01 Thread Jem Tucker
something like...

class RangePartitioner(Partitioner):
def __init__(self, numParts):
self.numPartitions = numParts
self.partitionFunction = rangePartition
def rangePartition(key):
# Logic to turn key into a partition id
return id

On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote:

> Hi
>
> I think range partitioner is not available in pyspark, so if we want
> create one. how should we create that. my question is that.
>
> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>
>> Ah sorry I miss read your question. In pyspark it looks like you just
>> need to instantiate the Partitioner class with numPartitions and
>> partitionFunc.
>>
>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote:
>>
>>> Hi
>>>
>>> I did not get this, e.g if i need to create a custom partitioner like
>>> range partitioner.
>>>
>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> You just need to extend Partitioner and override the numPartitions and
>>>> getPartition methods, see below
>>>>
>>>> class MyPartitioner extends partitioner {
>>>>   def numPartitions: Int = // Return the number of partitions
>>>>   def getPartition(key Any): Int = // Return the partition for a given
>>>> key
>>>> }
>>>>
>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com>
>>>> wrote:
>>>>
>>>>> Hi Sparkians
>>>>>
>>>>> How can we create a customer partition in pyspark
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>
>>>
>>> --
>>> with Regards
>>> Shahid Ashraf
>>>
>>
>
>
> --
> with Regards
> Shahid Ashraf
>


Custom partitioner

2015-07-26 Thread Hafiz Mujadid
Hi

I have csv data in which i have a column of date time. I want to partition
my data in 12 partitions with each partition containing data of one month
only. I am not getting how to write such partitioner and how to use that
partitioner to read write data.  

Kindly help me in this regard.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-partitioner-tp24001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom partitioner

2015-07-26 Thread Ted Yu
You can write subclass of Partitioner whose getPartition() returns
partition number corresponding to the given key.

Take a look at
core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala for
an example.

Cheers

On Sun, Jul 26, 2015 at 1:43 PM, Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi

 I have csv data in which i have a column of date time. I want to partition
 my data in 12 partitions with each partition containing data of one month
 only. I am not getting how to write such partitioner and how to use that
 partitioner to read write data.

 Kindly help me in this regard.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-partitioner-tp24001.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Python Custom Partitioner

2015-05-04 Thread ayan guha
Hi

Can someone share some working code for custom partitioner in python?

I am trying to understand it better.

Here is documentation

partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
0x2c45140*)
https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

Return a copy of the RDD partitioned using the specified partitioner.


what I am trying to do -

1. Create a dataframe

2. Partition it using one specific column

3. create another dataframe

4. partition it on the same column

5. join (to enforce map-side join)

My question:

a) Am I on right path?

b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy,
what gets passed to the custom function? tuple? row? how to access (say 3rd
column of a tuple inside partitioner function)?

-- 
Best Regards,
Ayan Guha


Re: Python Custom Partitioner

2015-05-04 Thread ๏̯͡๏
I have implemented map-side join with broadcast variables and the code is
on mailing list (scala).


On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Can someone share some working code for custom partitioner in python?

 I am trying to understand it better.

 Here is documentation

 partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
 0x2c45140*)
 https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

 Return a copy of the RDD partitioned using the specified partitioner.


 what I am trying to do -

 1. Create a dataframe

 2. Partition it using one specific column

 3. create another dataframe

 4. partition it on the same column

 5. join (to enforce map-side join)

 My question:

 a) Am I on right path?

 b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy,
 what gets passed to the custom function? tuple? row? how to access (say 3rd
 column of a tuple inside partitioner function)?

 --
 Best Regards,
 Ayan Guha




-- 
Deepak


Re: Python Custom Partitioner

2015-05-04 Thread ayan guha
Thanks, but is there non broadcast solution?
On 5 May 2015 01:34, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have implemented map-side join with broadcast variables and the code is
 on mailing list (scala).


 On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Can someone share some working code for custom partitioner in python?

 I am trying to understand it better.

 Here is documentation

 partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
 0x2c45140*)
 https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

 Return a copy of the RDD partitioned using the specified partitioner.


 what I am trying to do -

 1. Create a dataframe

 2. Partition it using one specific column

 3. create another dataframe

 4. partition it on the same column

 5. join (to enforce map-side join)

 My question:

 a) Am I on right path?

 b) How can I do partitionby? Specifically, when I call
 DF.rdd.partitionBy, what gets passed to the custom function? tuple? row?
 how to access (say 3rd column of a tuple inside partitioner function)?

 --
 Best Regards,
 Ayan Guha




 --
 Deepak