Thanks, this works. Hopefully I didn't miss something important with this approach.
вт, 2 июня 2015 г. в 20:15, Cody Koeninger <c...@koeninger.org>: > If you're using the spark partition id directly as the key, then you don't > need to access offset ranges at all, right? > You can create a single instance of a partitioner in advance, and all it > needs to know is the number of partitions (which is just the count of all > the kafka topic/partitions). > > On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav < > krot.vyaches...@gmail.com> wrote: > >> Cody, >> >> Thanks, good point. I fixed getting partition id to: >> >> class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner >> { >> override def numPartitions: Int = offsetRanges.size >> >> override def getPartition(key: Any): Int = { >> // this is set in .map(m => (TaskContext.get().partitionId(), >> m.value)) >> key.asInstanceOf[Int] >> } >> } >> >> inputStream >> .map(m => (TaskContext.get().partitionId(), m.value)) >> .transform { rdd => >> val part = new >> MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) >> new ProxyRDDWithPartitioner(rdd, part) >> } >> ... >> >> But how can I create same partitioner during updateStateByKey call? I >> have no idea how to access rdd when calling updateStateByKey. >> >> вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>: >> >>> I think the general idea is worth pursuing. >>> >>> However, this line: >>> >>> override def getPartition(key: Any): Int = { >>> key.asInstanceOf[(String, Int)]._2 >>> } >>> >>> is using the kafka partition id, not the spark partition index, so it's >>> going to give you fewer partitions / incorrect index >>> >>> Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The >>> index into the offset range array matches the (spark) partition id. That >>> will also tell you what the value of numPartitions should be. >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav < >>> krot.vyaches...@gmail.com> wrote: >>> >>>> Hi all, >>>> In my streaming job I'm using kafka streaming direct approach and want >>>> to maintain state with updateStateByKey. My PairRDD has message's topic >>>> name + partition id as a key. So, I assume that updateByState could work >>>> within same partition as KafkaRDD and not lead to shuffles. Actually this >>>> is not true, because updateStateByKey leads to cogroup transformation that >>>> thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd >>>> does not have partitioner at all. So, dependency is considered to be wide >>>> and leads to shuffle. >>>> >>>> I tried to avoid shuffling by providing custom partitioner to >>>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I >>>> created a proxy RDD that just returns my partitioner. >>>> >>>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: >>>> Partitioner) extends RDD[T](prev) { >>>> >>>> override val partitioner = Some(part) >>>> >>>> override def compute(split: Partition, context: TaskContext): >>>> Iterator[T] = prev.compute(split, context) >>>> >>>> override protected def getPartitions: Array[Partition] = >>>> prev.partitions >>>> >>>> override def getPreferredLocations(thePart: Partition): Seq[String] = >>>> prev.preferredLocations(thePart) >>>> } >>>> >>>> I use it as: >>>> val partitioner = new Partitioner { >>>> // TODO this should be retrieved from kafka >>>> override def numPartitions: Int = 2 >>>> >>>> override def getPartition(key: Any): Int = { >>>> key.asInstanceOf[(String, Int)]._2 >>>> } >>>> } >>>> >>>> inputStream >>>> .map(m => ((m.topic, m.partition), m.value)) >>>> .transform(new ProxyRDDWithPartitioner(_, partitioner)) >>>> .updateStateByKey(func, partitioner) >>>> .... >>>> >>>> The question is - is it safe to do such trick? >>>> >>> >>> >