Cody Koeninger-2 wrote
> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1 partition
> 
>  reduceByKey(sumFunc, 1)
> 
> But you started with 4 kafka partitions?  So they're definitely no longer
> 1:1

True. I added the second arg because we were seeing multiple threads
attempting to update the same offset. Setting it to 1 prevented that but
doesn't fix the core issue.


Cody Koeninger-2 wrote
>> This is what I'm suggesting, in pseudocode
>>
>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>    offset = offsets(i)
>>    result = yourReductionFunction(iter)
>>    transaction {
>>       save(result)
>>       save(offset)
>>    }
>> }.foreach { (_: Nothing) => () }
>>
>> where yourReductionFunction is just normal scala code.
>>

I'll give this a try. Thanks, Cody.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11928.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to