Spark Streaming: Async action scheduling inside foreachRDD
Hi all, What is the correct way to schedule multiple jobs inside foreachRDD method and importantly await on result to ensure those jobs have completed successfully? E.g.: kafkaDStream.foreachRDD{ rdd => val rdd1 = rdd.map(...) val rdd2 = rdd1.map(...) val job1Future = Future{ rdd1.saveToCassandra(...) } val job2Future = Future{ rdd1.foreachPartition( iter => /* save to Kafka */) } Await.result( Future.sequence(job1Future, job2Future), Duration.Inf) // commit Kafka offsets } In this code I'm scheduling two actions in futures and awaiting them. I need to be sure when I commit Kafka offsets at the end of the batch processing that job1 and job2 have actually executed successfully. Does given approach provide these guarantees? I.e. in case one of the jobs fails the entire batch will be marked as failed too? Thanks, Andrii
Re: Impact of coalesce operation before writing dataframe
Ah that's right. I didn't mention it: I have 10 executors in my cluster, and so when I do .coalesce(10) and right after that saving orc to s3 - does coalescing really affects parallelism? To me it looks like no, because we went from 100 tasks that are executed in parallel by 10 executors to 10 tasks that are executed by same 10 executors. Now, I understand that there may be some data skew that may result in uneven partitions but that's not my case really (according to Spark UI). Again I'm trying to understand first of all how coalescing dataset impacts executor memory, gc etc. Maybe if coalesce is done before writing dataset, each of the resulting partition needs to be evaluated and thus stored in memory? - just a guess. Andrii 2017-05-23 23:42 GMT+03:00 John Compitello <jo...@broadinstitute.org>: > Spark is doing operations on each partition in parallel. If you decrease > number of partitions, you’re potentially doing less work in parallel > depending on your cluster setup. > > On May 23, 2017, at 4:23 PM, Andrii Biletskyi <andrii.bilets...@yahoo.com. > INVALID <andrii.bilets...@yahoo.com.invalid>> wrote: > > > No, I didn't try to use repartition, how exactly it impacts the > parallelism? > In my understanding coalesce simply "unions" multiple partitions located > on same executor "one on on top of the other", while repartition does > hash-based shuffle decreasing the number of output partitions. So how this > exactly affects the parallelism, which stage of the job? > > Thanks, > Andrii > > > > On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust < > mich...@databricks.com> wrote: > > > coalesce is nice because it does not shuffle, but the consequence of > avoiding a shuffle is it will also reduce parallelism of the preceding > computation. Have you tried using repartition instead? > > On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi < > andrii.bilets...@yahoo.com.invalid> wrote: > > Hi all, > > I'm trying to understand the impact of coalesce operation on spark job > performance. > > As a side note: were are using emrfs (i.e. aws s3) as source and a target > for the job. > > Omitting unnecessary details job can be explained as: join 200M records > Dataframe stored in orc format on emrfs with another 200M records cached > Dataframe, the result of the join put back to emrfs. First DF is a set of > wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark > shows 20 GB). > > I have enough resources in my cluster to perform the job but I don't like > the fact that output datasource contains 200 part orc files (as > spark.sql.shuffle. > partitions defaults to 200) so before saving orc to emrfs I'm doing > .coalesce(10). From documentation coalesce looks like a quite harmless > operations: no repartitioning etc. > > But with such setup my job fails to write dataset on the last stage. Right > now the error is OOM: GC overhead. When I change .coalesce(10) to > .coalesce(100) the job runs much faster and finishes without errors. > > So what's the impact of .coalesce in this case? And how to do in place > concatenation of files (not involving hive) to end up with smaller amount > of bigger files, as with .coalesce(100) job generates 100 orc snappy > encoded files ~300MB each. > > Thanks, > Andrii > > > > > >
Re: Impact of coalesce operation before writing dataframe
No, I didn't try to use repartition, how exactly it impacts the parallelism?In my understanding coalesce simply "unions" multiple partitions located on same executor "one on on top of the other", while repartition does hash-based shuffle decreasing the number of output partitions. So how this exactly affects the parallelism, which stage of the job? Thanks,Andrii On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust <mich...@databricks.com> wrote: coalesce is nice because it does not shuffle, but the consequence of avoiding a shuffle is it will also reduce parallelism of the preceding computation. Have you tried using repartition instead? On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <andrii.bilets...@yahoo.com.invalid> wrote: Hi all, I'm trying to understand the impact of coalesce operation on spark job performance. As a side note: were are using emrfs (i.e. aws s3) as source and a target for the job. Omitting unnecessary details job can be explained as: join 200M records Dataframe stored in orc format on emrfs with another 200M records cached Dataframe, the result of the join put back to emrfs. First DF is a set of wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark shows 20 GB). I have enough resources in my cluster to perform the job but I don't like the fact that output datasource contains 200 part orc files (as spark.sql.shuffle. partitions defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10). From documentation coalesce looks like a quite harmless operations: no repartitioning etc. But with such setup my job fails to write dataset on the last stage. Right now the error is OOM: GC overhead. When I change .coalesce(10) to .coalesce(100) the job runs much faster and finishes without errors. So what's the impact of .coalesce in this case? And how to do in place concatenation of files (not involving hive) to end up with smaller amount of bigger files, as with .coalesce(100) job generates 100 orc snappy encoded files ~300MB each. Thanks,Andrii
Re: Impact of coalesce operation before writing dataframe
No, I didn't try to use repartition, how exactly it impacts the parallelism? In my understanding coalesce simply "unions" multiple partitions located on same executor "one on on top of the other", while repartition does hash-based shuffle decreasing the number of output partitions. So how this exactly affects the parallelism, which stage of the job? Thanks, Andrii 2017-05-23 22:19 GMT+03:00 Michael Armbrust <mich...@databricks.com>: > coalesce is nice because it does not shuffle, but the consequence of > avoiding a shuffle is it will also reduce parallelism of the preceding > computation. Have you tried using repartition instead? > > On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi < > andrii.bilets...@yahoo.com.invalid> wrote: > >> Hi all, >> >> I'm trying to understand the impact of coalesce operation on spark job >> performance. >> >> As a side note: were are using emrfs (i.e. aws s3) as source and a target >> for the job. >> >> Omitting unnecessary details job can be explained as: join 200M records >> Dataframe stored in orc format on emrfs with another 200M records cached >> Dataframe, the result of the join put back to emrfs. First DF is a set of >> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark >> shows 20 GB). >> >> I have enough resources in my cluster to perform the job but I don't like >> the fact that output datasource contains 200 part orc files (as >> spark.sql.shuffle.partitions defaults to 200) so before saving orc to >> emrfs I'm doing .coalesce(10). From documentation coalesce looks like a >> quite harmless operations: no repartitioning etc. >> >> But with such setup my job fails to write dataset on the last stage. >> Right now the error is OOM: GC overhead. When I change .coalesce(10) to >> .coalesce(100) the job runs much faster and finishes without errors. >> >> So what's the impact of .coalesce in this case? And how to do in place >> concatenation of files (not involving hive) to end up with smaller amount >> of bigger files, as with .coalesce(100) job generates 100 orc snappy >> encoded files ~300MB each. >> >> Thanks, >> Andrii >> > >
Impact of coalesce operation before writing dataframe
Hi all, I'm trying to understand the impact of coalesce operation on spark job performance. As a side note: were are using emrfs (i.e. aws s3) as source and a target for the job. Omitting unnecessary details job can be explained as: join 200M records Dataframe stored in orc format on emrfs with another 200M records cached Dataframe, the result of the join put back to emrfs. First DF is a set of wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark shows 20 GB). I have enough resources in my cluster to perform the job but I don't like the fact that output datasource contains 200 part orc files (as spark.sql.shuffle.partitions defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10). >From documentation coalesce looks like a quite harmless operations: no repartitioning etc. But with such setup my job fails to write dataset on the last stage. Right now the error is OOM: GC overhead. When I change .coalesce(10) to .coalesce(100) the job runs much faster and finishes without errors. So what's the impact of .coalesce in this case? And how to do in place concatenation of files (not involving hive) to end up with smaller amount of bigger files, as with .coalesce(100) job generates 100 orc snappy encoded files ~300MB each. Thanks, Andrii
Re: MapWithState partitioning
Thanks, As I understand for Kafka case the way to do it is to define my kafka.Partitioner that is used when data is produced to Kafka and just reuse this partitioner as spark.Partitioner in mapWithState spec. I think I'll stick with that. Thanks, Andrii 2016-10-31 16:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > You may know that those streams share the same keys, but Spark doesn't > unless you tell it. > > mapWithState takes a StateSpec, which should allow you to specify a > partitioner. > > On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com> > wrote: > > Thanks for response, > > > > So as I understand there is no way to "tell" mapWithState leave the > > partitioning schema as any other transformation would normally do. > > Then I would like to clarify if there is a simple way to do a > transformation > > to a key-value stream and specify somehow the Partitioner that > effectively > > would result in the same partitioning schema as the original stream. > > I.e.: > > > > stream.mapPartitions({ crs => > > crs.map { cr => > > cr.key() -> cr.value() > > } > > }) <--- specify somehow Partitioner here for the resulting rdd. > > > > > > The reason I ask is that it simply looks strange to me that Spark will > have > > to shuffle each time my input stream and "state" stream during the > > mapWithState operation when I now for sure that those two streams will > > always share same keys and will not need access to others partitions. > > > > Thanks, > > Andrii > > > > > > 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> If you call a transformation on an rdd using the same partitioner as > that > >> rdd, no shuffle will occur. KafkaRDD doesn't have a partitioner, > there's no > >> consistent partitioning scheme that works for all kafka uses. You can > wrap > >> each kafkardd with an rdd that has a custom partitioner that you write > to > >> match your kafka partitioning scheme, and avoid a shuffle. > >> > >> The danger there is if you have any misbehaving producers, or translate > >> the partitioning wrongly, you'll get bad results. It's safer just to > >> shuffle. > >> > >> > >> On Oct 31, 2016 04:31, "Andrii Biletskyi" > >> <andrii.bilets...@yahoo.com.invalid> wrote: > >> > >> Hi all, > >> > >> I'm using Spark Streaming mapWithState operation to do a stateful > >> operation on my Kafka stream (though I think similar arguments would > apply > >> for any source). > >> > >> Trying to understand a way to control mapWithState's partitioning > schema. > >> > >> My transformations are simple: > >> > >> 1) create KafkaDStream > >> 2) mapPartitions to get a key-value stream where `key` corresponds to > >> Kafka message key > >> 3) apply mapWithState operation on key-value stream, the state stream > >> shares keys with the original stream, the resulting streams doesn't > change > >> keys either > >> > >> The problem is that, as I understand, mapWithState stream has a > different > >> partitioning schema and thus I see shuffles in Spark Web UI. > >> > >> From the mapWithState implementation I see that: > >> mapwithState uses Partitioner if specified, otherwise partitions data > with > >> HashPartitioner(). The thing is that original > >> KafkaDStream has a specific partitioning schema: Kafka partitions > correspond > >> Spark RDD partitions. > >> > >> Question: is there a way for mapWithState stream to inherit partitioning > >> schema from the original stream (i.e. correspond to Kafka partitions). > >> > >> Thanks, > >> Andrii > >> > >> > > >
MapWithState partitioning
Hi all, I'm using Spark Streaming mapWithState operation to do a stateful operation on my Kafka stream (though I think similar arguments would apply for any source). Trying to understand a way to control mapWithState's partitioning schema. My transformations are simple: 1) create KafkaDStream 2) mapPartitions to get a key-value stream where `key` corresponds to Kafka message key 3) apply mapWithState operation on key-value stream, the state stream shares keys with the original stream, the resulting streams doesn't change keys either The problem is that, as I understand, mapWithState stream has a different partitioning schema and thus I see shuffles in Spark Web UI. >From the mapWithState implementation I see that: mapwithState uses Partitioner if specified, otherwise partitions data with HashPartitioner(). The thing is that original KafkaDStream has a specific partitioning schema: Kafka partitions correspond Spark RDD partitions. Question: is there a way for mapWithState stream to inherit partitioning schema from the original stream (i.e. correspond to Kafka partitions). Thanks, Andrii