Thanks, this works.
Hopefully I didn't miss something important with this approach.

вт, 2 июня 2015 г. в 20:15, Cody Koeninger <c...@koeninger.org>:

> If you're using the spark partition id directly as the key, then you don't
> need to access offset ranges at all, right?
> You can create a single instance of a partitioner in advance, and all it
> needs to know is the number of partitions (which is just the count of all
> the kafka topic/partitions).
>
> On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav <
> krot.vyaches...@gmail.com> wrote:
>
>> Cody,
>>
>> Thanks, good point. I fixed getting partition id to:
>>
>> class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner
>> {
>>   override def numPartitions: Int = offsetRanges.size
>>
>>   override def getPartition(key: Any): Int = {
>>     // this is set in .map(m => (TaskContext.get().partitionId(),
>> m.value))
>>     key.asInstanceOf[Int]
>>   }
>> }
>>
>> inputStream
>>     .map(m => (TaskContext.get().partitionId(), m.value))
>>     .transform { rdd =>
>>         val part = new
>> MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
>>         new ProxyRDDWithPartitioner(rdd, part)
>>     }
>>     ...
>>
>> But how can I create same partitioner during updateStateByKey call?   I
>> have no idea how to access rdd when calling updateStateByKey.
>>
>> вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>:
>>
>>> I think the general idea is worth pursuing.
>>>
>>> However, this line:
>>>
>>>  override def getPartition(key: Any): Int = {
>>>     key.asInstanceOf[(String, Int)]._2
>>>   }
>>>
>>> is using the kafka partition id, not the spark partition index, so it's
>>> going to give you fewer partitions / incorrect index
>>>
>>> Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The
>>> index into the offset range array matches the (spark) partition id.  That
>>> will also tell you what the value of numPartitions should be.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav <
>>> krot.vyaches...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>> In my streaming job I'm using kafka streaming direct approach and want
>>>> to maintain state with updateStateByKey. My PairRDD has message's topic
>>>> name + partition id as a key. So, I assume that updateByState could work
>>>> within same partition as KafkaRDD and not lead to shuffles. Actually this
>>>> is not true, because updateStateByKey leads to cogroup transformation that
>>>> thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd
>>>> does not have partitioner at all. So, dependency is considered to be wide
>>>> and leads to shuffle.
>>>>
>>>> I tried to avoid shuffling by providing custom partitioner to
>>>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I
>>>> created a proxy RDD that just returns my partitioner.
>>>>
>>>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
>>>> Partitioner) extends RDD[T](prev) {
>>>>
>>>>   override val partitioner = Some(part)
>>>>
>>>>   override def compute(split: Partition, context: TaskContext):
>>>> Iterator[T] = prev.compute(split, context)
>>>>
>>>>   override protected def getPartitions: Array[Partition] =
>>>> prev.partitions
>>>>
>>>>   override def getPreferredLocations(thePart: Partition): Seq[String] =
>>>> prev.preferredLocations(thePart)
>>>> }
>>>>
>>>> I use it as:
>>>> val partitioner = new Partitioner {
>>>>   // TODO this should be retrieved from kafka
>>>>   override def numPartitions: Int = 2
>>>>
>>>>   override def getPartition(key: Any): Int = {
>>>>     key.asInstanceOf[(String, Int)]._2
>>>>   }
>>>> }
>>>>
>>>> inputStream
>>>>   .map(m => ((m.topic, m.partition), m.value))
>>>>   .transform(new ProxyRDDWithPartitioner(_, partitioner))
>>>>   .updateStateByKey(func, partitioner)
>>>>   ....
>>>>
>>>> The question is - is it safe to do such trick?
>>>>
>>>
>>>
>

Reply via email to