Re: Coalesce behaviour

2018-11-19 Thread Sergey Zhemzhitsky
which asks Spark to launch only 20 
>>>>> reducers to process the data which were prepared for 1 reducers. 
>>>>> since the reducers have heavy work(sorting), so you OOM. In general, your 
>>>>> work flow is: 1000 mappers -> 20 reducers.
>>>>>
>>>>> In your second example, the coalesce introduces shuffle, so your work 
>>>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The 
>>>>> sorting is done by 1000 tasks so no OOM.
>>>>>
>>>>> BTW have you tried DataFrame API? With Spark SQL, the memory management 
>>>>> is more precise, so even we only have 20 tasks to do the heavy sorting, 
>>>>> the system should just have more disk spills instead of OOM.
>>>>>
>>>>>
>>>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers  wrote:
>>>>>>
>>>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a 
>>>>>> map phase with 10 partitions and 10 tasks that writes to hdfs?
>>>>>>
>>>>>> every time i try to do this using coalesce the shuffle ends up having 10 
>>>>>> tasks which is unacceptable due to OOM. this makes coalesce somewhat 
>>>>>> useless.
>>>>>>
>>>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan  wrote:
>>>>>>>
>>>>>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>>>>>
>>>>>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. 
>>>>>>> Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` 
>>>>>>> and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, 
>>>>>>> and this stage has 10 tasks (decided by the last RDD). This means, each 
>>>>>>> Spark task will process 10 partitions of `rdd1`.
>>>>>>>
>>>>>>> Looking at your example, I don't see where is the problem. Can you 
>>>>>>> describe what is not expected?
>>>>>>>
>>>>>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky  
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Well, it seems that I can still extend the CoalesceRDD to make it 
>>>>>>>> preserve the total number of partitions from the parent RDD, reduce 
>>>>>>>> some partitons in the same way as the original coalesce does for 
>>>>>>>> map-only jobs and fill the gaps (partitions which should reside on the 
>>>>>>>> positions of the coalesced ones) with just a special kind of 
>>>>>>>> partitions which do not have any parent dependencies and always return 
>>>>>>>> an empty iterator.
>>>>>>>>
>>>>>>>> I believe this should work as desired (at least the previous 
>>>>>>>> ShuffleMapStage will think that the number of partitons in the next 
>>>>>>>> stage, it generates shuffle output for, is not changed).
>>>>>>>>
>>>>>>>> There are few issues though - existence of empty partitions which can 
>>>>>>>> be evaluated almost for free and empty output files from these empty 
>>>>>>>> partitons which can be beaten by means of LazyOutputFormat in case of 
>>>>>>>> RDDs.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
>>>>>>>>>
>>>>>>>>> although i personally would describe this as a bug the answer will be 
>>>>>>>>> that this is the intended behavior. the coalesce "infects" the 
>>>>>>>>> shuffle before it, making a coalesce useless for reducing output 
>>>>>>>>> files after a shuffle with many partitions b design.
>>>>>>>>>
>>>>>>>>> your only option left is a repartition for which you pay the price in 
>>>>>>>>> that it introduces another expensive shuffle.
>>>>>>>>>
>>>>>>>>> interestingly if you do a coalesce on a map-only job it knows how to 
>>>>>>>>> reduce the partitions and output files w

Re: Coalesce behaviour

2018-10-13 Thread Sergey Zhemzhitsky
I've tried the same sample with DataFrame API and it's much more
stable although it's backed by RDD API.

This sample works without any issues and any additional Spark tuning

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
val df = rdd.map(item => item._1.toString -> item._2.toString).toDF()
df.repartition(1000,$"_1")
  .sortWithinPartitions($"_2")
  .coalesce(20)
  .count

The other thing I've noticed is that in case of RDDs the sample below

rdd.map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

... always fails on the reduce side of the shuffle.
The failures occur because of "Container killed by YARN for exceeding
memory limits. ... Consider boosting
spark.yarn.executor.memoryOverhead" errors.
In case I give containers more memory the job stabilises, but it's
just a matter of time when it fails again processing a bit increased
amount of data.

The job also stabilises if forcing spills by setting
spark.shuffle.spill.numElementsForceSpillThreshold=N property.
So I believe that the issue is connected with external sorting
(org.apache.spark.util.collection.ExternalSorter) on the reduce side
of the shuffle although there are a lot of logic to detect when the
spill should happen.
Do you think that this is due to incorrect memory estimation remaining
for the container or something else? ... because "Container killed by
YARN for exceeding memory limits" continues to happen.

On Sat, Oct 13, 2018 at 8:39 AM Wenchen Fan  wrote:
>
> In your first example, the root RDD has 1000 partitions, then you do a 
> shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 
> reducers. Then you do coalesce, which asks Spark to launch only 20 reducers 
> to process the data which were prepared for 1 reducers. since the 
> reducers have heavy work(sorting), so you OOM. In general, your work flow is: 
> 1000 mappers -> 20 reducers.
>
> In your second example, the coalesce introduces shuffle, so your work flow 
> is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The sorting 
> is done by 1000 tasks so no OOM.
>
> BTW have you tried DataFrame API? With Spark SQL, the memory management is 
> more precise, so even we only have 20 tasks to do the heavy sorting, the 
> system should just have more disk spills instead of OOM.
>
>
> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers  wrote:
>>
>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a map 
>> phase with 10 partitions and 10 tasks that writes to hdfs?
>>
>> every time i try to do this using coalesce the shuffle ends up having 10 
>> tasks which is unacceptable due to OOM. this makes coalesce somewhat useless.
>>
>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan  wrote:
>>>
>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>
>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then 
>>> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and 
>>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and 
>>> this stage has 10 tasks (decided by the last RDD). This means, each Spark 
>>> task will process 10 partitions of `rdd1`.
>>>
>>> Looking at your example, I don't see where is the problem. Can you describe 
>>> what is not expected?
>>>
>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky  
>>> wrote:
>>>>
>>>> Well, it seems that I can still extend the CoalesceRDD to make it preserve 
>>>> the total number of partitions from the parent RDD, reduce some partitons 
>>>> in the same way as the original coalesce does for map-only jobs and fill 
>>>> the gaps (partitions which should reside on the positions of the coalesced 
>>>> ones) with just a special kind of partitions which do not have any parent 
>>>> dependencies and always return an empty iterator.
>>>>
>>>> I believe this should work as desired (at least the previous 
>>>> ShuffleMapStage will think that the number of partitons in the next stage, 
>>>> it generates shuffle output for, is not changed).
>>>>
>>>> There are few issues though - existence of empty partitions which can be 
>>>> evaluated almost for free and empty output files from these empty 
>>>> partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
>>>>
>>>>
>>>>
>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
>>>>&g

Re: Coalesce behaviour

2018-10-12 Thread Sergey Zhemzhitsky
... sorry for that, but there is a mistake in the second sample, here
is the right one

// fails with either OOM or 'Container killed by YARN for exceeding
memory limits ... spark.yarn.executor.memoryOverhead'
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

// works as expected
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,true)
  .count
On Fri, Oct 12, 2018 at 7:20 PM Sergey Zhemzhitsky  wrote:
>
> I'd like to reduce the number of files written to hdfs without
> introducing additional shuffles and at the same time to preserve the
> stability of the job, and also I'd like to understand why the samples
> below work in one case and fail in another one.
>
> Consider the following example which does the same thing using the
> same resources, but fails in one case and works without issues in
> another one if there is an additional shuffle introduced:
>
> spark-shell \
>   --num-executors=5 \
>   --executor-cores=2 \
>   --master=yarn-client \
>   --conf spark.executor.memory=4g \
>   --conf spark.executor.memoryOverhead=1024 \
>   --conf spark.dynamicAllocation.enabled=false
>
> import org.apache.hadoop.io._
> import org.apache.hadoop.io.compress._
> import org.apache.commons.lang._
> import org.apache.spark._
>
> // generate 100M records of sample data
> sc.makeRDD(1 to 1000, 1000)
>   .flatMap(item => (1 to 10)
> .map(i => new
> Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new
> Text(RandomStringUtils.randomAlphanumeric(1024)))
>   )
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
>
> // count unique keys
> rdd.keys.map(_.toString).distinct.count
> // in my case it's equal to 46656
>
> // fails with either OOM or 'Container killed by YARN for exceeding
> memory limits ... spark.yarn.executor.memoryOverhead'
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
>
> // works as expected
> rdd
>   .map(item => item._1.toString -> item._2.toString)
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
>   .coalesce(20,false)
>   .count
> On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan  wrote:
> >
> > Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
> >
> > Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then 
> > `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and 
> > `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and 
> > this stage has 10 tasks (decided by the last RDD). This means, each Spark 
> > task will process 10 partitions of `rdd1`.
> >
> > Looking at your example, I don't see where is the problem. Can you describe 
> > what is not expected?
> >
> > On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky  
> > wrote:
> >>
> >> Well, it seems that I can still extend the CoalesceRDD to make it preserve 
> >> the total number of partitions from the parent RDD, reduce some partitons 
> >> in the same way as the original coalesce does for map-only jobs and fill 
> >> the gaps (partitions which should reside on the positions of the coalesced 
> >> ones) with just a special kind of partitions which do not have any parent 
> >> dependencies and always return an empty iterator.
> >>
> >> I believe this should work as desired (at least the previous 
> >> ShuffleMapStage will think that the number of partitons in the next stage, 
> >> it generates shuffle output for, is not changed).
> >>
> >> There are few issues though - existence of empty partitions which can be 
> >> evaluated almost for free and empty output files from these empty 
> >> partitons which can be beaten by means of LazyOutputFormat in case of RDDs.
> >>
> >>
> >>
> >> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
> >>>
> >>> although i personally would describe this as a bug the answer will be 
> >>> that this is the intended behavior. the coalesce "infects" the shuffle 
> >>> before it, making a coalesce useless for reducing output files after a 
> >>> shuffle with many partitions b design.
> >>>
> >>> your only option left is a repartition for

Re: Coalesce behaviour

2018-10-12 Thread Sergey Zhemzhitsky
I'd like to reduce the number of files written to hdfs without
introducing additional shuffles and at the same time to preserve the
stability of the job, and also I'd like to understand why the samples
below work in one case and fail in another one.

Consider the following example which does the same thing using the
same resources, but fails in one case and works without issues in
another one if there is an additional shuffle introduced:

spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn-client \
  --conf spark.executor.memory=4g \
  --conf spark.executor.memoryOverhead=1024 \
  --conf spark.dynamicAllocation.enabled=false

import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 10)
.map(i => new
Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new
Text(RandomStringUtils.randomAlphanumeric(1024)))
  )
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])

// count unique keys
rdd.keys.map(_.toString).distinct.count
// in my case it's equal to 46656

// fails with either OOM or 'Container killed by YARN for exceeding
memory limits ... spark.yarn.executor.memoryOverhead'
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count

// works as expected
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(20,false)
  .count
On Wed, Oct 10, 2018 at 4:06 PM Wenchen Fan  wrote:
>
> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>
> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then 
> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. 
> During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage 
> has 10 tasks (decided by the last RDD). This means, each Spark task will 
> process 10 partitions of `rdd1`.
>
> Looking at your example, I don't see where is the problem. Can you describe 
> what is not expected?
>
> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky  wrote:
>>
>> Well, it seems that I can still extend the CoalesceRDD to make it preserve 
>> the total number of partitions from the parent RDD, reduce some partitons in 
>> the same way as the original coalesce does for map-only jobs and fill the 
>> gaps (partitions which should reside on the positions of the coalesced ones) 
>> with just a special kind of partitions which do not have any parent 
>> dependencies and always return an empty iterator.
>>
>> I believe this should work as desired (at least the previous ShuffleMapStage 
>> will think that the number of partitons in the next stage, it generates 
>> shuffle output for, is not changed).
>>
>> There are few issues though - existence of empty partitions which can be 
>> evaluated almost for free and empty output files from these empty partitons 
>> which can be beaten by means of LazyOutputFormat in case of RDDs.
>>
>>
>>
>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:
>>>
>>> although i personally would describe this as a bug the answer will be that 
>>> this is the intended behavior. the coalesce "infects" the shuffle before 
>>> it, making a coalesce useless for reducing output files after a shuffle 
>>> with many partitions b design.
>>>
>>> your only option left is a repartition for which you pay the price in that 
>>> it introduces another expensive shuffle.
>>>
>>> interestingly if you do a coalesce on a map-only job it knows how to reduce 
>>> the partitions and output files without introducing a shuffle, so clearly 
>>> it is possible, but i dont know how to get this behavior after a shuffle in 
>>> an existing job.
>>>
>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky  
>>> wrote:
>>>>
>>>> Hello guys,
>>>>
>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>
>>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>>> are usually repartitioned to redistribute data more evenly and to
>>>> prevent every partition from hitting 2GB limit. Then after join with a
>>>> lot of partitions.
>>>>
>>

Re: Coalesce behaviour

2018-10-09 Thread Sergey Zhemzhitsky
Well, it seems that I can still extend the CoalesceRDD to make it preserve
the total number of partitions from the parent RDD, reduce some partitons
in the same way as the original coalesce does for map-only jobs and fill
the gaps (partitions which should reside on the positions of the coalesced
ones) with just a special kind of partitions which do not have any parent
dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous
ShuffleMapStage will think that the number of partitons in the next stage,
it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be
evaluated almost for free and empty output files from these empty partitons
which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers  wrote:

> although i personally would describe this as a bug the answer will be that
> this is the intended behavior. the coalesce "infects" the shuffle before
> it, making a coalesce useless for reducing output files after a shuffle
> with many partitions b design.
>
> your only option left is a repartition for which you pay the price in that
> it introduces another expensive shuffle.
>
> interestingly if you do a coalesce on a map-only job it knows how to
> reduce the partitions and output files without introducing a shuffle, so
> clearly it is possible, but i dont know how to get this behavior after a
> shuffle in an existing job.
>
> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky 
> wrote:
>
>> Hello guys,
>>
>> Currently I'm a little bit confused with coalesce behaviour.
>>
>> Consider the following usecase - I'd like to join two pretty big RDDs.
>> To make a join more stable and to prevent it from failures by OOM RDDs
>> are usually repartitioned to redistribute data more evenly and to
>> prevent every partition from hitting 2GB limit. Then after join with a
>> lot of partitions.
>>
>> Then after successful join I'd like to save the resulting dataset.
>> But I don't need such a huge amount of files as the number of
>> partitions/tasks during joining. Actually I'm fine with such number of
>> files as the total number of executor cores allocated to the job. So
>> I've considered using a coalesce.
>>
>> The problem is that coalesce with shuffling disabled prevents join
>> from using the specified number of partitions and instead forces join
>> to use the number of partitions provided to coalesce
>>
>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>> false).toDebugString
>> res5: String =
>> (5) CoalescedRDD[15] at coalesce at :25 []
>>  |  MapPartitionsRDD[14] at repartition at :25 []
>>  |  CoalescedRDD[13] at repartition at :25 []
>>  |  ShuffledRDD[12] at repartition at :25 []
>>  +-(20) MapPartitionsRDD[11] at repartition at :25 []
>> |   ParallelCollectionRDD[10] at makeRDD at :25 []
>>
>> With shuffling enabled everything is ok, e.g.
>>
>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>> true).toDebugString
>> res6: String =
>> (5) MapPartitionsRDD[24] at coalesce at :25 []
>>  |  CoalescedRDD[23] at coalesce at :25 []
>>  |  ShuffledRDD[22] at coalesce at :25 []
>>  +-(100) MapPartitionsRDD[21] at coalesce at :25 []
>>  |   MapPartitionsRDD[20] at repartition at :25 []
>>  |   CoalescedRDD[19] at repartition at :25 []
>>  |   ShuffledRDD[18] at repartition at :25 []
>>  +-(20) MapPartitionsRDD[17] at repartition at :25 []
>> |   ParallelCollectionRDD[16] at makeRDD at :25 []
>>
>> In that case the problem is that for pretty huge datasets additional
>> reshuffling can take hours or at least comparable amount of time as
>> for the join itself.
>>
>> So I'd like to understand whether it is a bug or just an expected
>> behaviour?
>> In case it is expected is there any way to insert additional
>> ShuffleMapStage into an appropriate position of DAG but without
>> reshuffling itself?
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Coalesce behaviour

2018-10-05 Thread Sergey Zhemzhitsky
Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at :25 []
 |  MapPartitionsRDD[14] at repartition at :25 []
 |  CoalescedRDD[13] at repartition at :25 []
 |  ShuffledRDD[12] at repartition at :25 []
 +-(20) MapPartitionsRDD[11] at repartition at :25 []
|   ParallelCollectionRDD[10] at makeRDD at :25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at :25 []
 |  CoalescedRDD[23] at coalesce at :25 []
 |  ShuffledRDD[22] at coalesce at :25 []
 +-(100) MapPartitionsRDD[21] at coalesce at :25 []
 |   MapPartitionsRDD[20] at repartition at :25 []
 |   CoalescedRDD[19] at repartition at :25 []
 |   ShuffledRDD[18] at repartition at :25 []
 +-(20) MapPartitionsRDD[17] at repartition at :25 []
|   ParallelCollectionRDD[16] at makeRDD at :25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: AccumulatorV2 vs AccumulableParam (V1)

2018-05-04 Thread Sergey Zhemzhitsky
Hi Wenchen,

Thanks a lot for clarification and help.

Here is what I mean regarding the remaining points

For 2: Should we update the documentation [1] regarding custom
accumulators to be more clear and to highlight that
  a) custom accumulators should always override "copy" method to
prevent unexpected behaviour with losing type information
  b) custom accumulators cannot be direct anonymous subclasses of
AccumulatorV2 because of a)
  c) extending already existing accumulators almost always requires
overriding "copy" because of a)

For 3: Here is [2] the sample that shows that the same
AccumulableParam can be registered twice with different names.
Here is [3] the sample that fails with IllegalStateException on this
line [4] because accumulator's metadata is not null and it's hardly
possible to reset it to null (there is no public API for such a
thing).
I understand, that Spark creates different Accumulators for the same
AccumulableParam internally and because of AccumulatorV2 is stateful
using the same stateful accumulator instance in multiple places for
different things is very dangerous, so maybe we should highlight this
point in the documentation too?

For 5: Should we raise a JIRA for that?


[1] https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
[2] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L36
[3] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L59
[4] 
https://github.com/apache/spark/blob/4d5de4d303a773b1c18c350072344bd7efca9fc4/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L51


Kind Regards,
Sergey

On Thu, May 3, 2018 at 5:20 PM, Wenchen Fan  wrote:
> Hi Sergey,
>
> Thanks for your valuable feedback!
>
> For 1: yea this is definitely a bug and I have sent a PR to fix it.
> For 2: I have left my comments on the JIRA ticket.
> For 3: I don't quite understand it, can you give some concrete examples?
> For 4: yea this is a problem, but I think it's not a big deal, and we
> couldn't find a better solution at that time.
> For 5: I think this is a real problem. It looks to me that we can merge
> `isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is
> basically just the `copyAndReset`. If there is a way to fix this without
> breaking the existing API, I'm really happy to do it.
> For 6: same as 4. It's a problem but not a big deal.
>
> In general, I think accumulator v2 sacrifices some flexibility to simplify
> the framework and improve the performance. Users can still use accumulator
> v1 if flexibility is more important to them. We can keep improving
> accumulator v2 without breaking backward compatibility.
>
> Thanks,
> Wenchen
>
> On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky 
> wrote:
>>
>> Hello guys,
>>
>> I've started to migrate my Spark jobs which use Accumulators V1 to
>> AccumulatorV2 and faced with the following issues:
>>
>> 1. LegacyAccumulatorWrapper now requires the resulting type of
>> AccumulableParam to implement equals. In other case the
>> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
>> will fail with AssertionError (SPARK-23697 [1]).
>>
>> 2. Existing AccumulatorV2 classes are hardly difficult to extend
>> easily and correctly (SPARK-24154 [2]) due to its "copy" method which
>> is called during serialization and usually loses type information of
>> descendant classes which don't override "copy" (and it's easier to
>> implement an accumulator from scratch than override it correctly)
>>
>> 3. The same instance of AccumulatorV2 cannot be used with the same
>> SparkContext multiple times (unlike AccumulableParam) failing with
>> "IllegalStateException: Cannot register an Accumulator twice" even
>> after "reset" method called. So it's impossible to unregister already
>> registered accumulator from user code.
>>
>> 4. AccumulableParam (V1) implementations are usually more or less
>> stateless, while AccumulatorV2 implementations are almost always
>> stateful, leading to (unnecessary?) type checks (unlike
>> AccumulableParam). For example typical "merge" method of AccumulatorV2
>> requires to check whether current accumulator is of an appropriate
>> type, like here [3]
>>
>> 5. AccumulatorV2 is more difficult to implement correctly unlike
>> AccumulableParam. For example, in case of AccumulableParam I have to
>> implement just 3 methods (addAccumulator, addInPlace, zero), in case
>> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
>> AccumulatorV2 - 6 methods (isZero, copy, reset

AccumulatorV2 vs AccumulableParam (V1)

2018-05-02 Thread Sergey Zhemzhitsky
Hello guys,

I've started to migrate my Spark jobs which use Accumulators V1 to
AccumulatorV2 and faced with the following issues:

1. LegacyAccumulatorWrapper now requires the resulting type of
AccumulableParam to implement equals. In other case the
AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
will fail with AssertionError (SPARK-23697 [1]).

2. Existing AccumulatorV2 classes are hardly difficult to extend
easily and correctly (SPARK-24154 [2]) due to its "copy" method which
is called during serialization and usually loses type information of
descendant classes which don't override "copy" (and it's easier to
implement an accumulator from scratch than override it correctly)

3. The same instance of AccumulatorV2 cannot be used with the same
SparkContext multiple times (unlike AccumulableParam) failing with
"IllegalStateException: Cannot register an Accumulator twice" even
after "reset" method called. So it's impossible to unregister already
registered accumulator from user code.

4. AccumulableParam (V1) implementations are usually more or less
stateless, while AccumulatorV2 implementations are almost always
stateful, leading to (unnecessary?) type checks (unlike
AccumulableParam). For example typical "merge" method of AccumulatorV2
requires to check whether current accumulator is of an appropriate
type, like here [3]

5. AccumulatorV2 is more difficult to implement correctly unlike
AccumulableParam. For example, in case of AccumulableParam I have to
implement just 3 methods (addAccumulator, addInPlace, zero), in case
of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)

6. AccumulatorV2 classes are hardly possible to be anonymous classes,
because of their "copy" and "merge" methods which typically require a
concrete class to make a type check.

I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
just wondering whether there is a way to simplify the API of
AccumulatorV2 to meet the points described above and to be less error
prone?


[1] https://issues.apache.org/jira/browse/SPARK-23697
[2] https://issues.apache.org/jira/browse/SPARK-24154
[3] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348
[4] https://issues.apache.org/jira/browse/SPARK-14654

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky
One more option is to override writeReplace [1] in
LegacyAccumulatorWrapper to prevent such failures.

What do you think?

[1] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L158

On Fri, Mar 16, 2018 at 12:55 AM, Sergey Zhemzhitsky  wrote:
> Hi there,
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark
> 2.x failing with
> java.lang.AssertionError: assertion failed: copyAndReset must return a
> zero value copy
>
> It happens while serializing an accumulator here [1] although
> copyAndReset returns zero-value copy for sure, just consider the
> accumulator below
>
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder
> = new jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder):
> jl.StringBuilder = r1.append(r2)
> }
>
> So, Spark treats zero value as non-zero due to how isZero [2] is
> implemented in LegacyAccumulatorWrapper
>
> override def isZero: Boolean = _value == param.zero(initialValue)
>
> All this means, that the values to be accumulated must implement
> equals and hashCode, otherwise isZero is very likely to always return
> false.
>
> So I'm wondering why this assertion is necessary and whether it can be
> safely removed from there?
>
> [1] 
> https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165
> [2] 
> https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489
> [3] https://issues.apache.org/jira/browse/SPARK-23697

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky
Hi there,

I've noticed that accumulators of Spark 1.x no longer work with Spark
2.x failing with
java.lang.AssertionError: assertion failed: copyAndReset must return a
zero value copy

It happens while serializing an accumulator here [1] although
copyAndReset returns zero-value copy for sure, just consider the
accumulator below

val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder
= new jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder):
jl.StringBuilder = r1.append(r2)
}

So, Spark treats zero value as non-zero due to how isZero [2] is
implemented in LegacyAccumulatorWrapper

override def isZero: Boolean = _value == param.zero(initialValue)

All this means, that the values to be accumulated must implement
equals and hashCode, otherwise isZero is very likely to always return
false.

So I'm wondering why this assertion is necessary and whether it can be
safely removed from there?

[1] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165
[2] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489
[3] https://issues.apache.org/jira/browse/SPARK-23697

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RDD checkpoint failures in case of insufficient memory

2018-03-14 Thread Sergey Zhemzhitsky
Hi there,

A while ago running GraphX jobs I've discovered that
PeriodicRDDCheckpointer fails with FileNotFoundException's in case of
insufficient memory resources.

I believe that any iterative job which uses PeriodicRDDCheckpointer
(like ML) suffers from the same issue (but it's not visible enough
because of RAM size of modern servers).

So, I've raised the JIRA issues with the corresponding pull requests to fix them
- https://issues.apache.org/jira/browse/SPARK-22150
- https://issues.apache.org/jira/browse/SPARK-22184
- https://github.com/apache/spark/pull/19373
- https://github.com/apache/spark/pull/19410

Could anyone please look through these PRs?

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org