Yes, partitionBy will shuffle unless it happens to be partitioning
with the exact same partitioner the parent rdd had.

On Wed, Oct 12, 2016 at 8:34 AM, Samy Dindane <s...@dindane.com> wrote:
> Hey Cody,
>
> I ended up choosing a different way to do things, which is using Kafka to
> commit my offsets.
> It works fine, except it stores the offset in ZK instead of a Kafka topic
> (investigating this right now).
>
> I understand your explanations, thank you, but I have one question: when you
> say "Repartition almost always involves a shuffle", are you talking about
> repartition using partitionBy()?
>
>
> On 10/11/2016 01:23 AM, Cody Koeninger wrote:
>>
>> Repartition almost always involves a shuffle.
>>
>> Let me see if I can explain the recovery stuff...
>>
>> Say you start with two kafka partitions, topic-0 and topic-1.
>>
>> You shuffle those across 3 spark parittions, we'll label them A B and C.
>>
>> Your job is has written
>>
>> fileA: results for A, offset ranges for topic-0 offsets 10-60 and
>> topic-1 offsets 23 - 66
>> fileB: results for B, offset ranges for topic-0 offsets 10-60 and
>> topic-1 offsets 23 - 66
>>
>> It's in the middle of writing for fileC when it dies.
>>
>> On startup, you need to start the job from
>> topic-0 offsets 10-60 and topic-1 offsets 23 - 66
>> and do all the work in all the partitions (because of the shuffle, you
>> dont know which kafka partition the data came from)
>>
>> You just don't overwrite fileA and fileB, because they already have
>> correct data and offsets.  You just write fileC.
>>
>> Then once youve recovered you go on about your job as normal, starting
>> at topic-0 offsets 60, topic-1 offsets 66
>>
>> Clear as mud?
>>
>>
>>
>>
>>
>>
>> On Mon, Oct 10, 2016 at 5:36 PM, Samy Dindane <s...@dindane.com> wrote:
>>>
>>>
>>>
>>> On 10/10/2016 8:14 PM, Cody Koeninger wrote:
>>>>
>>>>
>>>> Glad it was helpful :)
>>>>
>>>> As far as executors, my expectation is that if you have multiple
>>>> executors running, and one of them crashes, the failed task will be
>>>> submitted on a different executor.  That is typically what I observe
>>>> in spark apps, if that's not what you're seeing I'd try to get help on
>>>> that specific issue.
>>>>
>>>> As far as kafka offsets per-partition, you need to atomically write
>>>> your offsets and results in the same place.If that place is a
>>>> filesystem, you need to be using atomic operations (I try to stay away
>>>> from HDFS, but I believe renames are atomic, for instance).
>>>
>>>
>>> That's what I am trying to do.
>>>>
>>>> If you're
>>>>
>>>> doing that in normal code, ie with an iterator instead of an rdd or
>>>> dataframe, you may have to do some of that work yourself.
>>>
>>>
>>> How would you do that without an iterator?
>>> As far as I've seen, to have granular control over partitions (and each
>>> partition's checkpoint), one needs to use `foreachPartition` or
>>> `mapPartitions`, thus dealing with an iterator instead of a RDD.
>>>>
>>>>
>>>>
>>>> As far as using partitionBy, even after repartitioning you can still
>>>> use the general idea of writing results and offsets in the same place.
>>>
>>>
>>> I don't understand what you mean. What do you mean by "the same place"?
>>> Why
>>> would I want to do that?
>>>>
>>>>
>>>> The major difference is that you need to write all offsets in each
>>>> partition (because things have been shuffled),
>>>
>>>
>>> Oh I didn't think of shuffling after partitioning.
>>> But does it matter if I use partitionBy()?
>>>
>>> I will make a code example once at the office so you can take a look.
>>>>
>>>>
>>>> and need to be more
>>>> careful on startup after a failure.  On startup, you'd see which
>>>> partitions were incomplete, start the job from the offsets in the
>>>> incomplete partitions, do the work for all partitions,
>>>
>>>
>>> It's clear until here.
>>>>
>>>>
>>>> but ignore the
>>>> writes when they got to the complete partitions.  I realize that's
>>>> kind of a complicated description, if it doesn't make sense ask, or I
>>>> may be able to put up some publicly visible code at some point in the
>>>> future.
>>>
>>>
>>> Yes I don't see what you mean. :)
>>>
>>> I really appreciate your help. Thanks a lot.
>>>
>>>>
>>>>
>>>> On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane <s...@dindane.com> wrote:
>>>>>
>>>>>
>>>>> I just noticed that you're the author of the code I linked in my
>>>>> previous
>>>>> email. :) It's helpful.
>>>>>
>>>>> When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
>>>>> Spark to write the data on the disk using `df.write()` but I need to
>>>>> use
>>>>> the
>>>>> iterator to do so, which means losing the ability of using
>>>>> partitionBy().
>>>>> Do you know a workaround? Or I'll be forced to partition data manually.
>>>>>
>>>>> I think I understand why the job crashes when a single executor does:
>>>>> `df.write()....save()` writes all the partitions in the same time,
>>>>> which
>>>>> fails if one of them has died.
>>>>> Is that right?
>>>>>
>>>>> Thank you.
>>>>>
>>>>> Samy
>>>>>
>>>>>
>>>>> On 10/10/2016 04:58 PM, Samy Dindane wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> I am writing a spark job that reads records from a Kafka topic and
>>>>>> writes
>>>>>> them on the file system.
>>>>>> This would be straightforward if it weren't for the custom
>>>>>> checkpointing
>>>>>> logic I want to have; Spark's checkpointing doesn't suit us as it
>>>>>> doesn't
>>>>>> permit code updates.
>>>>>>
>>>>>> The custom checkpointing would be similar to this:
>>>>>>
>>>>>>
>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
>>>>>> I am trying to understand how this would work if an executor crashes,
>>>>>> so
>>>>>> I
>>>>>> tried making one crash manually, but I noticed it kills the whole job
>>>>>> instead of creating another executor to resume the task.
>>>>>> Is that expected? Is there anything wrong with my approach?
>>>>>>
>>>>>> Thank you for your time.
>>>>>>
>>>>>>
>>>>>> On 10/10/2016 04:29 PM, Cody Koeninger wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> What is it you're actually trying to accomplish?
>>>>>>>
>>>>>>> On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane <s...@dindane.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I managed to make a specific executor crash by using
>>>>>>>> TaskContext.get.partitionId and throwing an exception for a specific
>>>>>>>> executor.
>>>>>>>>
>>>>>>>> The issue I have now is that the whole job stops when a single
>>>>>>>> executor
>>>>>>>> crashes.
>>>>>>>> Do I need to explicitly tell Spark to start a new executor and keep
>>>>>>>> the
>>>>>>>> other ones running?
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10/10/2016 11:19 AM, Samy Dindane wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am writing a streaming job that reads a Kafka topic.
>>>>>>>>> As far as I understand, Spark does a 1:1 mapping between its
>>>>>>>>> executors
>>>>>>>>> and
>>>>>>>>> Kafka partitions.
>>>>>>>>>
>>>>>>>>> In order to correctly implement my checkpoint logic, I'd like to
>>>>>>>>> know
>>>>>>>>> what
>>>>>>>>> exactly happens when an executors crashes.
>>>>>>>>> Also, is it possible to kill an executor manually for testing
>>>>>>>>> purposes?
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>> Samy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to