Re: Kafka Streaming and partitioning

2017-02-26 Thread tonyye
Hi Dave,
I had the same question and was wondering if you had found a way to do the
join without causing a shuffle?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955p28425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Streaming and partitioning

2016-01-13 Thread David D
Yep that's exactly what we want. Thanks for all the info Cody.
Dave.
On 13 Jan 2016 18:29, "Cody Koeninger"  wrote:

> The idea here is that the custom partitioner shouldn't actually get used
> for repartitioning the kafka stream (because that would involve a shuffle,
> which is what you're trying to avoid).  You're just assigning a partitioner
> because you know how it already is partitioned.
>
>
> On Wed, Jan 13, 2016 at 11:22 AM, Dave  wrote:
>
>> So for case 1 below
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>> When the data is read from Kafka it will be partitioned correctly with
>> the Custom Partitioner passed in to the new direct stream and kafka RDD
>> implementations.
>>
>> For case 2
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like 
>> Am I correct in saying that the data from Kafka will not be read into
>> memory in the cluster (kafka server is not located on the Spark Cluster in
>> my use case) until the following code is executed
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>> In which case it will run through the partitioner of the wrapped RDD when
>> it arrives in the cluster for the first time i.e. no shuffle.
>>
>> Thanks,
>> Dave.
>>
>>
>>
>> On 13/01/16 17:00, Cody Koeninger wrote:
>>
>> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
>> it's not cached by default.  If you're running kafka on the same nodes as
>> spark, then data locality would play a factor, but that should be handled
>> by the existing getPreferredLocations method.
>>
>> On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:
>>
>>> Thanks Cody, appreciate the response.
>>>
>>> With this pattern the partitioners will now match when the join is
>>> executed.
>>> However, does the wrapper RDD not need to set the partition meta data on
>>> the wrapped RDD in order to allow Spark to know where the data for each
>>> partition resides in the cluster.
>>>
>>> Thanks,
>>> Dave.
>>>
>>>
>>> On 13/01/16 16:21, Cody Koeninger wrote:
>>>
>>> If two rdds have an identical partitioner, joining should not involve a
>>> shuffle.
>>>
>>> You should be able to override the partitioner without calling
>>> partitionBy.
>>>
>>> Two ways I can think of to do this:
>>> - subclass or modify the direct stream and kafkardd.  They're private,
>>> so you'd need to rebuild just the external kafka project, not all of spark
>>>
>>> - write a wrapper subclass of rdd that takes a given custom partitioner
>>> and rdd in the constructor, overrides partitioner, and delegates every
>>> other method to the wrapped rdd.  This should be possible without
>>> modification to any existing spark code.  You'd use it something like
>>>
>>> val cp = YourCustomPartitioner(...)
>>> val reference = YourReferenceRDD(cp, ...)
>>> val stream = KafkaUtils
>>>
>>> stream.transform { rdd =>
>>>   val wrapped = YourWrapper(cp, rdd)
>>>   wrapped.join(reference)
>>> }
>>>
>>>
>>> I haven't had reason to do either one of those approaches, so YMMV, but
>>> I believe others have
>>>
>>>
>>>
>>>
>>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < 
>>> dave.davo...@gmail.com> wrote:
>>>
 Hi,

 I have the following use case:

 1. Reference data stored in an RDD that is persisted and partitioned
 using a
 simple custom partitioner.
 2. Input stream from kafka that uses the same partitioner algorithm as
 the
 ref data RDD - this partitioning is done in kafka.

 I am using kafka direct streams so the number of kafka partitions map
 to the
 number of partitions in the spark RDD. From testing and the
 documentation I
 see Spark does not know anything about how the data has been
 partitioned in
 kafka.

 In my use case I need to join the reference data RDD and the input
 stream
 RDD.  Due to the fact I have manually ensured the incoming data from
 kafka
 uses the same partitioning algorithm I know the data has been grouped
 correctly in the input stream RDD in Spark but I cannot do a join
 without a
 shuffle step due to the fact Spark has no knowledge of how the data has
 been
 partitioned.

 I have two ways to do this.
 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
 followed by a join. This results in a shuffle of the input stream RDD
 and
 then the co-partitioned join to take place.
 2. Call join on the reference data RDD passing in the input stream RDD.
 Spark will do a shuffle under the hood in this case and the join will
 take
 place. The join will do its best to run on a node that has l

Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
The idea here is that the custom partitioner shouldn't actually get used
for repartitioning the kafka stream (because that would involve a shuffle,
which is what you're trying to avoid).  You're just assigning a partitioner
because you know how it already is partitioned.


On Wed, Jan 13, 2016 at 11:22 AM, Dave  wrote:

> So for case 1 below
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
> When the data is read from Kafka it will be partitioned correctly with the
> Custom Partitioner passed in to the new direct stream and kafka RDD
> implementations.
>
> For case 2
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like 
> Am I correct in saying that the data from Kafka will not be read into
> memory in the cluster (kafka server is not located on the Spark Cluster in
> my use case) until the following code is executed
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
> In which case it will run through the partitioner of the wrapped RDD when
> it arrives in the cluster for the first time i.e. no shuffle.
>
> Thanks,
> Dave.
>
>
>
> On 13/01/16 17:00, Cody Koeninger wrote:
>
> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
> it's not cached by default.  If you're running kafka on the same nodes as
> spark, then data locality would play a factor, but that should be handled
> by the existing getPreferredLocations method.
>
> On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:
>
>> Thanks Cody, appreciate the response.
>>
>> With this pattern the partitioners will now match when the join is
>> executed.
>> However, does the wrapper RDD not need to set the partition meta data on
>> the wrapped RDD in order to allow Spark to know where the data for each
>> partition resides in the cluster.
>>
>> Thanks,
>> Dave.
>>
>>
>> On 13/01/16 16:21, Cody Koeninger wrote:
>>
>> If two rdds have an identical partitioner, joining should not involve a
>> shuffle.
>>
>> You should be able to override the partitioner without calling
>> partitionBy.
>>
>> Two ways I can think of to do this:
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>>
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like
>>
>> val cp = YourCustomPartitioner(...)
>> val reference = YourReferenceRDD(cp, ...)
>> val stream = KafkaUtils
>>
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>>
>>
>> I haven't had reason to do either one of those approaches, so YMMV, but I
>> believe others have
>>
>>
>>
>>
>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < 
>> dave.davo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have the following use case:
>>>
>>> 1. Reference data stored in an RDD that is persisted and partitioned
>>> using a
>>> simple custom partitioner.
>>> 2. Input stream from kafka that uses the same partitioner algorithm as
>>> the
>>> ref data RDD - this partitioning is done in kafka.
>>>
>>> I am using kafka direct streams so the number of kafka partitions map to
>>> the
>>> number of partitions in the spark RDD. From testing and the
>>> documentation I
>>> see Spark does not know anything about how the data has been partitioned
>>> in
>>> kafka.
>>>
>>> In my use case I need to join the reference data RDD and the input stream
>>> RDD.  Due to the fact I have manually ensured the incoming data from
>>> kafka
>>> uses the same partitioning algorithm I know the data has been grouped
>>> correctly in the input stream RDD in Spark but I cannot do a join
>>> without a
>>> shuffle step due to the fact Spark has no knowledge of how the data has
>>> been
>>> partitioned.
>>>
>>> I have two ways to do this.
>>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>>> followed by a join. This results in a shuffle of the input stream RDD and
>>> then the co-partitioned join to take place.
>>> 2. Call join on the reference data RDD passing in the input stream RDD.
>>> Spark will do a shuffle under the hood in this case and the join will
>>> take
>>> place. The join will do its best to run on a node that has local access
>>> to
>>> the reference data RDD.
>>>
>>> Is there any difference between the 2 methods above or will both cause
>>> the
>>> same sequence of events to take place in Spark?
>>> Is all I have stated above correct?
>>>
>>> Finally, is there any road 

Re: Kafka Streaming and partitioning

2016-01-13 Thread Dave

So for case 1 below
- subclass or modify the direct stream and kafkardd.  They're private, 
so you'd need to rebuild just the external kafka project, not all of spark
When the data is read from Kafka it will be partitioned correctly with 
the Custom Partitioner passed in to the new direct stream and kafka RDD 
implementations.


For case 2
- write a wrapper subclass of rdd that takes a given custom partitioner 
and rdd in the constructor, overrides partitioner, and delegates every 
other method to the wrapped rdd.  This should be possible without 
modification to any existing spark code.  You'd use it something like 
Am I correct in saying that the data from Kafka will not be read into 
memory in the cluster (kafka server is not located on the Spark Cluster 
in my use case) until the following code is executed

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}
In which case it will run through the partitioner of the wrapped RDD 
when it arrives in the cluster for the first time i.e. no shuffle.


Thanks,
Dave.



On 13/01/16 17:00, Cody Koeninger wrote:
In the case here of a kafkaRDD, the data doesn't reside on the 
cluster, it's not cached by default.  If you're running kafka on the 
same nodes as spark, then data locality would play a factor, but that 
should be handled by the existing getPreferredLocations method.


On Wed, Jan 13, 2016 at 10:46 AM, Dave > wrote:


Thanks Cody, appreciate the response.

With this pattern the partitioners will now match when the join is
executed.
However, does the wrapper RDD not need to set the partition meta
data on the wrapped RDD in order to allow Spark to know where the
data for each partition resides in the cluster.

Thanks,
Dave.


On 13/01/16 16:21, Cody Koeninger wrote:

If two rdds have an identical partitioner, joining should not
involve a shuffle.

You should be able to override the partitioner without calling
partitionBy.

Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd.  They're
private, so you'd need to rebuild just the external kafka
project, not all of spark

- write a wrapper subclass of rdd that takes a given custom
partitioner and rdd in the constructor, overrides partitioner,
and delegates every other method to the wrapped rdd.  This should
be possible without modification to any existing spark code. 
You'd use it something like


val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so
YMMV, but I believe others have




On Wed, Jan 13, 2016 at 3:40 AM, ddav mailto:dave.davo...@gmail.com>> wrote:

Hi,

I have the following use case:

1. Reference data stored in an RDD that is persisted and
partitioned using a
simple custom partitioner.
2. Input stream from kafka that uses the same partitioner
algorithm as the
ref data RDD - this partitioning is done in kafka.

I am using kafka direct streams so the number of kafka
partitions map to the
number of partitions in the spark RDD. From testing and the
documentation I
see Spark does not know anything about how the data has been
partitioned in
kafka.

In my use case I need to join the reference data RDD and the
input stream
RDD.  Due to the fact I have manually ensured the incoming
data from kafka
uses the same partitioning algorithm I know the data has been
grouped
correctly in the input stream RDD in Spark but I cannot do a
join without a
shuffle step due to the fact Spark has no knowledge of how
the data has been
partitioned.

I have two ways to do this.
1. Explicitly call PartitionBy(CutomParitioner) on the input
stream RDD
followed by a join. This results in a shuffle of the input
stream RDD and
then the co-partitioned join to take place.
2. Call join on the reference data RDD passing in the input
stream RDD.
Spark will do a shuffle under the hood in this case and the
join will take
place. The join will do its best to run on a node that has
local access to
the reference data RDD.

Is there any difference between the 2 methods above or will
both cause the
same sequence of events to take place in Spark?
Is all I have stated above correct?

Finally, is there any road map feature for looking at
allowing the user to
push a partitioner into an already created RDD and not to do
a shuffle.
Spa

Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
In the case here of a kafkaRDD, the data doesn't reside on the cluster,
it's not cached by default.  If you're running kafka on the same nodes as
spark, then data locality would play a factor, but that should be handled
by the existing getPreferredLocations method.

On Wed, Jan 13, 2016 at 10:46 AM, Dave  wrote:

> Thanks Cody, appreciate the response.
>
> With this pattern the partitioners will now match when the join is
> executed.
> However, does the wrapper RDD not need to set the partition meta data on
> the wrapped RDD in order to allow Spark to know where the data for each
> partition resides in the cluster.
>
> Thanks,
> Dave.
>
>
> On 13/01/16 16:21, Cody Koeninger wrote:
>
> If two rdds have an identical partitioner, joining should not involve a
> shuffle.
>
> You should be able to override the partitioner without calling partitionBy.
>
> Two ways I can think of to do this:
> - subclass or modify the direct stream and kafkardd.  They're private, so
> you'd need to rebuild just the external kafka project, not all of spark
>
> - write a wrapper subclass of rdd that takes a given custom partitioner
> and rdd in the constructor, overrides partitioner, and delegates every
> other method to the wrapped rdd.  This should be possible without
> modification to any existing spark code.  You'd use it something like
>
> val cp = YourCustomPartitioner(...)
> val reference = YourReferenceRDD(cp, ...)
> val stream = KafkaUtils
>
> stream.transform { rdd =>
>   val wrapped = YourWrapper(cp, rdd)
>   wrapped.join(reference)
> }
>
>
> I haven't had reason to do either one of those approaches, so YMMV, but I
> believe others have
>
>
>
>
> On Wed, Jan 13, 2016 at 3:40 AM, ddav  wrote:
>
>> Hi,
>>
>> I have the following use case:
>>
>> 1. Reference data stored in an RDD that is persisted and partitioned
>> using a
>> simple custom partitioner.
>> 2. Input stream from kafka that uses the same partitioner algorithm as the
>> ref data RDD - this partitioning is done in kafka.
>>
>> I am using kafka direct streams so the number of kafka partitions map to
>> the
>> number of partitions in the spark RDD. From testing and the documentation
>> I
>> see Spark does not know anything about how the data has been partitioned
>> in
>> kafka.
>>
>> In my use case I need to join the reference data RDD and the input stream
>> RDD.  Due to the fact I have manually ensured the incoming data from kafka
>> uses the same partitioning algorithm I know the data has been grouped
>> correctly in the input stream RDD in Spark but I cannot do a join without
>> a
>> shuffle step due to the fact Spark has no knowledge of how the data has
>> been
>> partitioned.
>>
>> I have two ways to do this.
>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>> followed by a join. This results in a shuffle of the input stream RDD and
>> then the co-partitioned join to take place.
>> 2. Call join on the reference data RDD passing in the input stream RDD.
>> Spark will do a shuffle under the hood in this case and the join will take
>> place. The join will do its best to run on a node that has local access to
>> the reference data RDD.
>>
>> Is there any difference between the 2 methods above or will both cause the
>> same sequence of events to take place in Spark?
>> Is all I have stated above correct?
>>
>> Finally, is there any road map feature for looking at allowing the user to
>> push a partitioner into an already created RDD and not to do a shuffle.
>> Spark in this case trusts that the data is setup correctly (as in the use
>> case above) and simply fills in the necessary meta data on the RDD
>> partitions i.e. check the first entry in each partition to determine the
>> partition number of the data.
>>
>> Thank you in advance for any help on this issue.
>> Dave.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: Kafka Streaming and partitioning

2016-01-13 Thread Dave

Thanks Cody, appreciate the response.

With this pattern the partitioners will now match when the join is 
executed.
However, does the wrapper RDD not need to set the partition meta data on 
the wrapped RDD in order to allow Spark to know where the data for each 
partition resides in the cluster.


Thanks,
Dave.

On 13/01/16 16:21, Cody Koeninger wrote:
If two rdds have an identical partitioner, joining should not involve 
a shuffle.


You should be able to override the partitioner without calling 
partitionBy.


Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd. They're private, 
so you'd need to rebuild just the external kafka project, not all of spark


- write a wrapper subclass of rdd that takes a given custom 
partitioner and rdd in the constructor, overrides partitioner, and 
delegates every other method to the wrapped rdd.  This should be 
possible without modification to any existing spark code.  You'd use 
it something like


val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so YMMV, 
but I believe others have





On Wed, Jan 13, 2016 at 3:40 AM, ddav > wrote:


Hi,

I have the following use case:

1. Reference data stored in an RDD that is persisted and
partitioned using a
simple custom partitioner.
2. Input stream from kafka that uses the same partitioner
algorithm as the
ref data RDD - this partitioning is done in kafka.

I am using kafka direct streams so the number of kafka partitions
map to the
number of partitions in the spark RDD. From testing and the
documentation I
see Spark does not know anything about how the data has been
partitioned in
kafka.

In my use case I need to join the reference data RDD and the input
stream
RDD.  Due to the fact I have manually ensured the incoming data
from kafka
uses the same partitioning algorithm I know the data has been grouped
correctly in the input stream RDD in Spark but I cannot do a join
without a
shuffle step due to the fact Spark has no knowledge of how the
data has been
partitioned.

I have two ways to do this.
1. Explicitly call PartitionBy(CutomParitioner) on the input
stream RDD
followed by a join. This results in a shuffle of the input stream
RDD and
then the co-partitioned join to take place.
2. Call join on the reference data RDD passing in the input stream
RDD.
Spark will do a shuffle under the hood in this case and the join
will take
place. The join will do its best to run on a node that has local
access to
the reference data RDD.

Is there any difference between the 2 methods above or will both
cause the
same sequence of events to take place in Spark?
Is all I have stated above correct?

Finally, is there any road map feature for looking at allowing the
user to
push a partitioner into an already created RDD and not to do a
shuffle.
Spark in this case trusts that the data is setup correctly (as in
the use
case above) and simply fills in the necessary meta data on the RDD
partitions i.e. check the first entry in each partition to
determine the
partition number of the data.

Thank you in advance for any help on this issue.
Dave.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: Kafka Streaming and partitioning

2016-01-13 Thread Cody Koeninger
If two rdds have an identical partitioner, joining should not involve a
shuffle.

You should be able to override the partitioner without calling partitionBy.

Two ways I can think of to do this:
- subclass or modify the direct stream and kafkardd.  They're private, so
you'd need to rebuild just the external kafka project, not all of spark

- write a wrapper subclass of rdd that takes a given custom partitioner and
rdd in the constructor, overrides partitioner, and delegates every other
method to the wrapped rdd.  This should be possible without modification to
any existing spark code.  You'd use it something like

val cp = YourCustomPartitioner(...)
val reference = YourReferenceRDD(cp, ...)
val stream = KafkaUtils

stream.transform { rdd =>
  val wrapped = YourWrapper(cp, rdd)
  wrapped.join(reference)
}


I haven't had reason to do either one of those approaches, so YMMV, but I
believe others have




On Wed, Jan 13, 2016 at 3:40 AM, ddav  wrote:

> Hi,
>
> I have the following use case:
>
> 1. Reference data stored in an RDD that is persisted and partitioned using
> a
> simple custom partitioner.
> 2. Input stream from kafka that uses the same partitioner algorithm as the
> ref data RDD - this partitioning is done in kafka.
>
> I am using kafka direct streams so the number of kafka partitions map to
> the
> number of partitions in the spark RDD. From testing and the documentation I
> see Spark does not know anything about how the data has been partitioned in
> kafka.
>
> In my use case I need to join the reference data RDD and the input stream
> RDD.  Due to the fact I have manually ensured the incoming data from kafka
> uses the same partitioning algorithm I know the data has been grouped
> correctly in the input stream RDD in Spark but I cannot do a join without a
> shuffle step due to the fact Spark has no knowledge of how the data has
> been
> partitioned.
>
> I have two ways to do this.
> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
> followed by a join. This results in a shuffle of the input stream RDD and
> then the co-partitioned join to take place.
> 2. Call join on the reference data RDD passing in the input stream RDD.
> Spark will do a shuffle under the hood in this case and the join will take
> place. The join will do its best to run on a node that has local access to
> the reference data RDD.
>
> Is there any difference between the 2 methods above or will both cause the
> same sequence of events to take place in Spark?
> Is all I have stated above correct?
>
> Finally, is there any road map feature for looking at allowing the user to
> push a partitioner into an already created RDD and not to do a shuffle.
> Spark in this case trusts that the data is setup correctly (as in the use
> case above) and simply fills in the necessary meta data on the RDD
> partitions i.e. check the first entry in each partition to determine the
> partition number of the data.
>
> Thank you in advance for any help on this issue.
> Dave.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>