Cody,
Thanks, good point. I fixed getting partition id to:
class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
override def numPartitions: Int = offsetRanges.size
override def getPartition(key: Any): Int = {
// this is set in .map(m => (TaskContext.get().partitionId(), m.value))
key.asInstanceOf[Int]
}
}
inputStream
.map(m => (TaskContext.get().partitionId(), m.value))
.transform { rdd =>
val part = new
MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
new ProxyRDDWithPartitioner(rdd, part)
}
...
But how can I create same partitioner during updateStateByKey call? I
have no idea how to access rdd when calling updateStateByKey.
вт, 2 июня 2015 г. в 19:15, Cody Koeninger <[email protected]>:
> I think the general idea is worth pursuing.
>
> However, this line:
>
> override def getPartition(key: Any): Int = {
> key.asInstanceOf[(String, Int)]._2
> }
>
> is using the kafka partition id, not the spark partition index, so it's
> going to give you fewer partitions / incorrect index
>
> Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index
> into the offset range array matches the (spark) partition id. That will
> also tell you what the value of numPartitions should be.
>
>
>
>
>
>
>
> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav <
> [email protected]> wrote:
>
>> Hi all,
>> In my streaming job I'm using kafka streaming direct approach and want to
>> maintain state with updateStateByKey. My PairRDD has message's topic name +
>> partition id as a key. So, I assume that updateByState could work within
>> same partition as KafkaRDD and not lead to shuffles. Actually this is not
>> true, because updateStateByKey leads to cogroup transformation that thinks,
>> that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
>> have partitioner at all. So, dependency is considered to be wide and leads
>> to shuffle.
>>
>> I tried to avoid shuffling by providing custom partitioner to
>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I
>> created a proxy RDD that just returns my partitioner.
>>
>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
>> Partitioner) extends RDD[T](prev) {
>>
>> override val partitioner = Some(part)
>>
>> override def compute(split: Partition, context: TaskContext):
>> Iterator[T] = prev.compute(split, context)
>>
>> override protected def getPartitions: Array[Partition] = prev.partitions
>>
>> override def getPreferredLocations(thePart: Partition): Seq[String] =
>> prev.preferredLocations(thePart)
>> }
>>
>> I use it as:
>> val partitioner = new Partitioner {
>> // TODO this should be retrieved from kafka
>> override def numPartitions: Int = 2
>>
>> override def getPartition(key: Any): Int = {
>> key.asInstanceOf[(String, Int)]._2
>> }
>> }
>>
>> inputStream
>> .map(m => ((m.topic, m.partition), m.value))
>> .transform(new ProxyRDDWithPartitioner(_, partitioner))
>> .updateStateByKey(func, partitioner)
>> ....
>>
>> The question is - is it safe to do such trick?
>>
>
>