Spark Streaming: Async action scheduling inside foreachRDD

2017-08-02 Thread Andrii Biletskyi
Hi all,

What is the correct way to schedule multiple jobs inside foreachRDD method
and importantly await on result to ensure those jobs have completed
successfully?
E.g.:

kafkaDStream.foreachRDD{ rdd =>
val rdd1 = rdd.map(...)
val rdd2 = rdd1.map(...)

val job1Future = Future{
rdd1.saveToCassandra(...)
}

val job2Future = Future{
rdd1.foreachPartition( iter => /* save to Kafka */)
}

  Await.result(
  Future.sequence(job1Future, job2Future),
  Duration.Inf)


   // commit Kafka offsets
}

In this code I'm scheduling two actions in futures and awaiting them. I
need to be sure when I commit Kafka offsets at the end of the batch
processing that job1 and job2 have actually executed successfully. Does
given approach provide these guarantees? I.e. in case one of the jobs fails
the entire batch will be marked as failed too?


Thanks,
Andrii


Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread Andrii Biletskyi
Ah that's right. I didn't mention it: I have 10 executors in my cluster,
and so when I do .coalesce(10) and right after that saving orc to s3 - does
coalescing really affects parallelism? To me it looks like no, because we
went from 100 tasks that are executed in parallel by 10 executors to 10
tasks that are executed by same 10 executors. Now, I understand that there
may be some data skew that may result in uneven partitions but that's not
my case really (according to Spark UI).
Again I'm trying to understand first of all how coalescing dataset impacts
executor memory, gc etc. Maybe if coalesce is done before writing dataset,
each of the resulting partition needs to be evaluated and thus stored in
memory? - just a guess.

Andrii

2017-05-23 23:42 GMT+03:00 John Compitello <jo...@broadinstitute.org>:

> Spark is doing operations on each partition in parallel. If you decrease
> number of partitions, you’re potentially doing less work in parallel
> depending on your cluster setup.
>
> On May 23, 2017, at 4:23 PM, Andrii Biletskyi <andrii.bilets...@yahoo.com.
> INVALID <andrii.bilets...@yahoo.com.invalid>> wrote:
>
>
> No, I didn't try to use repartition, how exactly it impacts the
> parallelism?
> In my understanding coalesce simply "unions" multiple partitions located
> on same executor "one on on top of the other", while repartition does
> hash-based shuffle decreasing the number of output partitions. So how this
> exactly affects the parallelism, which stage of the job?
>
> Thanks,
> Andrii
>
>
>
> On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>
> coalesce is nice because it does not shuffle, but the consequence of
> avoiding a shuffle is it will also reduce parallelism of the preceding
> computation.  Have you tried using repartition instead?
>
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
> andrii.bilets...@yahoo.com.invalid> wrote:
>
> Hi all,
>
> I'm trying to understand the impact of coalesce operation on spark job
> performance.
>
> As a side note: were are using emrfs (i.e. aws s3) as source and a target
> for the job.
>
> Omitting unnecessary details job can be explained as: join 200M records
> Dataframe stored in orc format on emrfs with another 200M records cached
> Dataframe, the result of the join put back to emrfs. First DF is a set of
> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
> shows 20 GB).
>
> I have enough resources in my cluster to perform the job but I don't like
> the fact that output datasource contains 200 part orc files (as 
> spark.sql.shuffle.
> partitions defaults to 200) so before saving orc to emrfs I'm doing
> .coalesce(10). From documentation coalesce looks like a quite harmless
> operations: no repartitioning etc.
>
> But with such setup my job fails to write dataset on the last stage. Right
> now the error is OOM: GC overhead. When I change  .coalesce(10) to
> .coalesce(100) the job runs much faster and finishes without errors.
>
> So what's the impact of .coalesce in this case? And how to do in place
> concatenation of files (not involving hive) to end up with smaller amount
> of bigger files, as with .coalesce(100) job generates 100 orc snappy
> encoded files ~300MB each.
>
> Thanks,
> Andrii
>
>
>
>
>
>


Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread Andrii Biletskyi
 No, I didn't try to use repartition, how exactly it impacts the parallelism?In 
my understanding coalesce simply "unions" multiple partitions located on same 
executor "one on on top of the other", while repartition does hash-based 
shuffle decreasing the number of output partitions. So how this exactly affects 
the parallelism, which stage of the job?
Thanks,Andrii
 

On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust 
<mich...@databricks.com> wrote:
 

 coalesce is nice because it does not shuffle, but the consequence of avoiding 
a shuffle is it will also reduce parallelism of the preceding computation.  
Have you tried using repartition instead?
On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi 
<andrii.bilets...@yahoo.com.invalid> wrote:

Hi all,
I'm trying to understand the impact of coalesce operation on spark job 
performance.
As a side note: were are using emrfs (i.e. aws s3) as source and a target for 
the job.
Omitting unnecessary details job can be explained as: join 200M records 
Dataframe stored in orc format on emrfs with another 200M records cached 
Dataframe, the result of the join put back to emrfs. First DF is a set of wide 
rows (Spark UI shows 300 GB) and the second is relatively small (Spark shows 20 
GB).
I have enough resources in my cluster to perform the job but I don't like the 
fact that output datasource contains 200 part orc files (as spark.sql.shuffle. 
partitions defaults to 200) so before saving orc to emrfs I'm doing 
.coalesce(10). From documentation coalesce looks like a quite harmless 
operations: no repartitioning etc.
But with such setup my job fails to write dataset on the last stage. Right now 
the error is OOM: GC overhead. When I change  .coalesce(10) to .coalesce(100) 
the job runs much faster and finishes without errors.
So what's the impact of .coalesce in this case? And how to do in place 
concatenation of files (not involving hive) to end up with smaller amount of 
bigger files, as with .coalesce(100) job generates 100 orc snappy encoded files 
~300MB each.
Thanks,Andrii



   

Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread Andrii Biletskyi
No, I didn't try to use repartition, how exactly it impacts the parallelism?
In my understanding coalesce simply "unions" multiple partitions located on
same executor "one on on top of the other", while repartition does
hash-based shuffle decreasing the number of output partitions. So how this
exactly affects the parallelism, which stage of the job?

Thanks,
Andrii

2017-05-23 22:19 GMT+03:00 Michael Armbrust <mich...@databricks.com>:

> coalesce is nice because it does not shuffle, but the consequence of
> avoiding a shuffle is it will also reduce parallelism of the preceding
> computation.  Have you tried using repartition instead?
>
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
> andrii.bilets...@yahoo.com.invalid> wrote:
>
>> Hi all,
>>
>> I'm trying to understand the impact of coalesce operation on spark job
>> performance.
>>
>> As a side note: were are using emrfs (i.e. aws s3) as source and a target
>> for the job.
>>
>> Omitting unnecessary details job can be explained as: join 200M records
>> Dataframe stored in orc format on emrfs with another 200M records cached
>> Dataframe, the result of the join put back to emrfs. First DF is a set of
>> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
>> shows 20 GB).
>>
>> I have enough resources in my cluster to perform the job but I don't like
>> the fact that output datasource contains 200 part orc files (as
>> spark.sql.shuffle.partitions defaults to 200) so before saving orc to
>> emrfs I'm doing .coalesce(10). From documentation coalesce looks like a
>> quite harmless operations: no repartitioning etc.
>>
>> But with such setup my job fails to write dataset on the last stage.
>> Right now the error is OOM: GC overhead. When I change  .coalesce(10) to
>> .coalesce(100) the job runs much faster and finishes without errors.
>>
>> So what's the impact of .coalesce in this case? And how to do in place
>> concatenation of files (not involving hive) to end up with smaller amount
>> of bigger files, as with .coalesce(100) job generates 100 orc snappy
>> encoded files ~300MB each.
>>
>> Thanks,
>> Andrii
>>
>
>


Impact of coalesce operation before writing dataframe

2017-05-23 Thread Andrii Biletskyi
Hi all,

I'm trying to understand the impact of coalesce operation on spark job
performance.

As a side note: were are using emrfs (i.e. aws s3) as source and a target
for the job.

Omitting unnecessary details job can be explained as: join 200M records
Dataframe stored in orc format on emrfs with another 200M records cached
Dataframe, the result of the join put back to emrfs. First DF is a set of
wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
shows 20 GB).

I have enough resources in my cluster to perform the job but I don't like
the fact that output datasource contains 200 part orc files (as
spark.sql.shuffle.partitions
defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10).
>From documentation coalesce looks like a quite harmless operations: no
repartitioning etc.

But with such setup my job fails to write dataset on the last stage. Right
now the error is OOM: GC overhead. When I change  .coalesce(10) to
.coalesce(100) the job runs much faster and finishes without errors.

So what's the impact of .coalesce in this case? And how to do in place
concatenation of files (not involving hive) to end up with smaller amount
of bigger files, as with .coalesce(100) job generates 100 orc snappy
encoded files ~300MB each.

Thanks,
Andrii


Re: MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Thanks,

As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.

I think I'll stick with that.

Thanks,
Andrii

2016-10-31 16:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> You may know that those streams share the same keys, but Spark doesn't
> unless you tell it.
>
> mapWithState takes a StateSpec, which should allow you to specify a
> partitioner.
>
> On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com>
> wrote:
> > Thanks for response,
> >
> > So as I understand there is no way to "tell" mapWithState leave the
> > partitioning schema as any other transformation would normally do.
> > Then I would like to clarify if there is a simple way to do a
> transformation
> > to a key-value stream and specify somehow the Partitioner that
> effectively
> > would result in the same partitioning schema as the original stream.
> > I.e.:
> >
> > stream.mapPartitions({ crs =>
> >   crs.map { cr =>
> > cr.key() -> cr.value()
> >   }
> > }) <--- specify somehow Partitioner here for the resulting rdd.
> >
> >
> > The reason I ask is that it simply looks strange to me that Spark will
> have
> > to shuffle each time my input stream and "state" stream during the
> > mapWithState operation when I now for sure that those two streams will
> > always share same keys and will not need access to others partitions.
> >
> > Thanks,
> > Andrii
> >
> >
> > 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> If you call a transformation on an rdd using the same partitioner as
> that
> >> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner,
> there's no
> >> consistent partitioning scheme that works for all kafka uses. You can
> wrap
> >> each kafkardd with an rdd that has a custom partitioner that you write
> to
> >> match your kafka partitioning scheme, and avoid a shuffle.
> >>
> >> The danger there is if you have any misbehaving producers, or translate
> >> the partitioning wrongly, you'll get bad results. It's safer just to
> >> shuffle.
> >>
> >>
> >> On Oct 31, 2016 04:31, "Andrii Biletskyi"
> >> <andrii.bilets...@yahoo.com.invalid> wrote:
> >>
> >> Hi all,
> >>
> >> I'm using Spark Streaming mapWithState operation to do a stateful
> >> operation on my Kafka stream (though I think similar arguments would
> apply
> >> for any source).
> >>
> >> Trying to understand a way to control mapWithState's partitioning
> schema.
> >>
> >> My transformations are simple:
> >>
> >> 1) create KafkaDStream
> >> 2) mapPartitions to get a key-value stream where `key` corresponds to
> >> Kafka message key
> >> 3) apply mapWithState operation on key-value stream, the state stream
> >> shares keys with the original stream, the resulting streams doesn't
> change
> >> keys either
> >>
> >> The problem is that, as I understand, mapWithState stream has a
> different
> >> partitioning schema and thus I see shuffles in Spark Web UI.
> >>
> >> From the mapWithState implementation I see that:
> >> mapwithState uses Partitioner if specified, otherwise partitions data
> with
> >> HashPartitioner(). The thing is that original
> >> KafkaDStream has a specific partitioning schema: Kafka partitions
> correspond
> >> Spark RDD partitions.
> >>
> >> Question: is there a way for mapWithState stream to inherit partitioning
> >> schema from the original stream (i.e. correspond to Kafka partitions).
> >>
> >> Thanks,
> >> Andrii
> >>
> >>
> >
>


MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii