Re: Serial batching with Spark Streaming
String.format("%03d", >>>>>> lineRddIndex.localValue()) + "_lines_"; >>>>>> try (final PrintStream out = new PrintStream(prefix + >>>>>> UUID.randomUUID())) { >>>>>> rdd.collect().forEach(s -> out.println(s)); >>>>>> } >>>>>> return null; >>>>>> }); >>>>>> >>>>>> final JavaDStream words = >>>>>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>>>>> final JavaPairDStream pairs = >>>>>> words.mapToPair(s -> new Tuple2(s, 1)); >>>>>> final JavaPairDStream wordCounts = >>>>>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>>>>> >>>>>> final Accumulator sleep = >>>>>> context.sparkContext().accumulator(0); >>>>>> final JavaPairDStream wordCounts2 = >>>>>> JavaPairDStream.fromJavaDStream( >>>>>> wordCounts.transform( (rdd) -> { >>>>>> sleep.add(1); >>>>>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); >>>>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >>>>>> rdd.classTag()); >>>>>> })); >>>>>> >>>>>> final Function2, Optional, >>>>>> Optional> updateFunction = >>>>>> (values, state) -> { >>>>>> Integer newSum = state.or(0); >>>>>> for (final Integer value : values) { >>>>>> newSum += value; >>>>>> } >>>>>> return Optional.of(newSum); >>>>>> }; >>>>>> >>>>>> final List> tuples = >>>>>> ImmutableList.> of(); >>>>>> final JavaPairRDD initialRDD = >>>>>> context.sparkContext().parallelizePairs(tuples); >>>>>> >>>>>> final JavaPairDStream wordCountsState = >>>>>> wordCounts2.updateStateByKey( >>>>>> updateFunction, >>>>>> new >>>>>> HashPartitioner(context.sparkContext().defaultParallelism()), >>>>>> initialRDD); >>>>>> >>>>>> wordCountsState.print(); >>>>>> >>>>>> final Accumulator rddIndex = >>>>>> context.sparkContext().accumulator(0); >>>>>> wordCountsState.foreachRDD( rdd -> { >>>>>> rddIndex.add(1); >>>>>> final String prefix = "/tmp/" + String.format("%03d", >>>>>> rddIndex.localValue()) + "_words_"; >>>>>> try (final PrintStream out = new PrintStream(prefix + >>>>>> UUID.randomUUID())) { >>>>>> rdd.collect().forEach(s -> out.println(s)); >>>>>> } >>>>>> return null; >>>>>> }); >>>>>> >>>>>> context.start(); >>>>>> context.awaitTermination(); >>>>>> } >>>>>> >>>>>> >>>>>> On 17 June 2015 at 17:25, Tathagata Das wrote: >>>>>> > The default behavior should be that batch X + 1 starts processing >>>>>> only after >>>>>> > batch X completes. If you are using Spark 1.4.0, could you show us a >>>>>> > screenshot of the streaming tab, especially the list of batches? >>>>>> And could >>>>>> > you also tell us if you are setting any SparkConf configurations? >>>>>> > >>>>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia < >>>>>> mici...@gmail.com> wrote: >>>>>> >> >>>>>> >> Is it possible to achieve serial batching with Spark Streaming? >>>>>> >> >>>>>> >> Example: >>>>>> >> >>>>>> >> I configure the Streaming Context for creating a batch every 3 >>>>>> seconds. >>>>>> >> >>>>>> >> Processing of the batch #2 takes longer than 3 seconds and creates >>>>>> a >>>>>> >> backlog of batches: >>>>>> >> >>>>>> >> batch #1 takes 2s >>>>>> >> batch #2 takes 10s >>>>>> >> batch #3 takes 2s >>>>>> >> batch #4 takes 2s >>>>>> >> >>>>>> >> Whet testing locally, it seems that processing of multiple batches >>>>>> is >>>>>> >> finished at the same time: >>>>>> >> >>>>>> >> batch #1 finished at 2s >>>>>> >> batch #2 finished at 12s >>>>>> >> batch #3 finished at 12s (processed in parallel) >>>>>> >> batch #4 finished at 15s >>>>>> >> >>>>>> >> How can I delay processing of the next batch, so that is processed >>>>>> >> only after processing of the previous batch has been completed? >>>>>> >> >>>>>> >> batch #1 finished at 2s >>>>>> >> batch #2 finished at 12s >>>>>> >> batch #3 finished at 14s (processed serially) >>>>>> >> batch #4 finished at 16s >>>>>> >> >>>>>> >> I want to perform a transformation for every key only once in a >>>>>> given >>>>>> >> period of time (e.g. batch duration). I find all unique keys in a >>>>>> >> batch and perform the transformation on each key. To ensure that >>>>>> the >>>>>> >> transformation is done for every key only once, only one batch can >>>>>> be >>>>>> >> processed at a time. At the same time, I want that single batch to >>>>>> be >>>>>> >> processed in parallel. >>>>>> >> >>>>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >>>>>> >> stream = context.receiverStream(...); >>>>>> >> stream >>>>>> >> .reduceByKey(...) >>>>>> >> .transform(...) >>>>>> >> .foreachRDD(output); >>>>>> >> >>>>>> >> Any ideas or pointers are very welcome. >>>>>> >> >>>>>> >> Thanks! >>>>>> >> >>>>>> >> >>>>>> - >>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >> >>>>>> > >>>>>> >>>>>> - >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: Serial batching with Spark Streaming
elp me understand why the time for processing of all >>>>> jobs for a batch is always less than 4 seconds? >>>>> >>>>> Please see my playground code below. >>>>> >>>>> The last modified time of the input (lines) RDD dump files seems to >>>>> match the Thread.sleep delays (20s or 5s) in the transform operation >>>>> or the batching interval (10s): 20s, 5s, 10s. >>>>> >>>>> However, neither the batch processing time in the Streaming tab nor >>>>> the last modified time of the output (words) RDD dump files reflect >>>>> the Thread.sleep delays. >>>>> >>>>> 07:20 3240 001_lines_... >>>>> 07:21 117 001_words_... >>>>> 07:41 37224 002_lines_... >>>>> 07:43 252 002_words_... >>>>> 08:00 37728 003_lines_... >>>>> 08:02 504 003_words_... >>>>> 08:20 38952 004_lines_... >>>>> 08:22 756 004_words_... >>>>> 08:40 38664 005_lines_... >>>>> 08:42 999 005_words_... >>>>> 08:45 38160 006_lines_... >>>>> 08:47 1134 006_words_... >>>>> 08:50 9720 007_lines_... >>>>> 08:51 1260 007_words_... >>>>> 08:55 9864 008_lines_... >>>>> 08:56 1260 008_words_... >>>>> 09:00 10656 009_lines_... >>>>> 09:01 1395 009_words_... >>>>> 09:05 11664 010_lines_... >>>>> 09:06 1395 010_words_... >>>>> 09:11 10935 011_lines_... >>>>> 09:11 1521 011_words_... >>>>> 09:16 11745 012_lines_... >>>>> 09:16 1530 012_words_... >>>>> 09:21 12069 013_lines_... >>>>> 09:22 1656 013_words_... >>>>> 09:27 10692 014_lines_... >>>>> 09:27 1665 014_words_... >>>>> 09:32 10449 015_lines_... >>>>> 09:32 1791 015_words_... >>>>> 09:37 11178 016_lines_... >>>>> 09:37 1800 016_words_... >>>>> 09:45 17496 017_lines_... >>>>> 09:45 1926 017_words_... >>>>> 09:55 22032 018_lines_... >>>>> 09:56 2061 018_words_... >>>>> 10:05 21951 019_lines_... >>>>> 10:06 2196 019_words_... >>>>> 10:15 21870 020_lines_... >>>>> 10:16 2322 020_words_... >>>>> 10:25 21303 021_lines_... >>>>> 10:26 2340 021_words_... >>>>> >>>>> >>>>> final SparkConf conf = new >>>>> SparkConf().setMaster("local[4]").setAppName("WordCount"); >>>>> try (final JavaStreamingContext context = new >>>>> JavaStreamingContext(conf, Durations.seconds(10))) { >>>>> >>>>> context.checkpoint("/tmp/checkpoint"); >>>>> >>>>> final JavaDStream lines = context.union( >>>>> context.receiverStream(new GeneratorReceiver()), >>>>> ImmutableList.of( >>>>> context.receiverStream(new GeneratorReceiver()), >>>>> context.receiverStream(new GeneratorReceiver(; >>>>> >>>>> lines.print(); >>>>> >>>>> final Accumulator lineRddIndex = >>>>> context.sparkContext().accumulator(0); >>>>> lines.foreachRDD( rdd -> { >>>>> lineRddIndex.add(1); >>>>> final String prefix = "/tmp/" + String.format("%03d", >>>>> lineRddIndex.localValue()) + "_lines_"; >>>>> try (final PrintStream out = new PrintStream(prefix + >>>>> UUID.randomUUID())) { >>>>> rdd.collect().forEach(s -> out.println(s)); >>>>> } >>>>> return null; >>>>> }); >>>>> >>>>> final JavaDStream words = >>>>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>>>> final JavaPairDStream pairs = >>>>> words.mapToPair(s -> new Tuple2(s, 1)); >>>>> final JavaPairDStream wordCounts = >>>>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>>>> >>>
Re: Serial batching with Spark Streaming
gt;>> 07:21 117 001_words_... >>>> 07:41 37224 002_lines_... >>>> 07:43 252 002_words_... >>>> 08:00 37728 003_lines_... >>>> 08:02 504 003_words_... >>>> 08:20 38952 004_lines_... >>>> 08:22 756 004_words_... >>>> 08:40 38664 005_lines_... >>>> 08:42 999 005_words_... >>>> 08:45 38160 006_lines_... >>>> 08:47 1134 006_words_... >>>> 08:50 9720 007_lines_... >>>> 08:51 1260 007_words_... >>>> 08:55 9864 008_lines_... >>>> 08:56 1260 008_words_... >>>> 09:00 10656 009_lines_... >>>> 09:01 1395 009_words_... >>>> 09:05 11664 010_lines_... >>>> 09:06 1395 010_words_... >>>> 09:11 10935 011_lines_... >>>> 09:11 1521 011_words_... >>>> 09:16 11745 012_lines_... >>>> 09:16 1530 012_words_... >>>> 09:21 12069 013_lines_... >>>> 09:22 1656 013_words_... >>>> 09:27 10692 014_lines_... >>>> 09:27 1665 014_words_... >>>> 09:32 10449 015_lines_... >>>> 09:32 1791 015_words_... >>>> 09:37 11178 016_lines_... >>>> 09:37 1800 016_words_... >>>> 09:45 17496 017_lines_... >>>> 09:45 1926 017_words_... >>>> 09:55 22032 018_lines_... >>>> 09:56 2061 018_words_... >>>> 10:05 21951 019_lines_... >>>> 10:06 2196 019_words_... >>>> 10:15 21870 020_lines_... >>>> 10:16 2322 020_words_... >>>> 10:25 21303 021_lines_... >>>> 10:26 2340 021_words_... >>>> >>>> >>>> final SparkConf conf = new >>>> SparkConf().setMaster("local[4]").setAppName("WordCount"); >>>> try (final JavaStreamingContext context = new >>>> JavaStreamingContext(conf, Durations.seconds(10))) { >>>> >>>> context.checkpoint("/tmp/checkpoint"); >>>> >>>> final JavaDStream lines = context.union( >>>> context.receiverStream(new GeneratorReceiver()), >>>> ImmutableList.of( >>>> context.receiverStream(new GeneratorReceiver()), >>>> context.receiverStream(new GeneratorReceiver(; >>>> >>>> lines.print(); >>>> >>>> final Accumulator lineRddIndex = >>>> context.sparkContext().accumulator(0); >>>> lines.foreachRDD( rdd -> { >>>> lineRddIndex.add(1); >>>> final String prefix = "/tmp/" + String.format("%03d", >>>> lineRddIndex.localValue()) + "_lines_"; >>>> try (final PrintStream out = new PrintStream(prefix + >>>> UUID.randomUUID())) { >>>> rdd.collect().forEach(s -> out.println(s)); >>>> } >>>> return null; >>>> }); >>>> >>>> final JavaDStream words = >>>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>>> final JavaPairDStream pairs = >>>> words.mapToPair(s -> new Tuple2(s, 1)); >>>> final JavaPairDStream wordCounts = >>>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>>> >>>> final Accumulator sleep = >>>> context.sparkContext().accumulator(0); >>>> final JavaPairDStream wordCounts2 = >>>> JavaPairDStream.fromJavaDStream( >>>> wordCounts.transform( (rdd) -> { >>>> sleep.add(1); >>>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); >>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >>>> rdd.classTag()); >>>> })); >>>> >>>> final Function2, Optional, >>>> Optional> updateFunction = >>>> (values, state) -> { >>>> Integer newSum = state.or(0); >>>> for (final Integer value : values) { >>>> newSum += value; >>>> } >>>> return Optional.of(newSum); >>>> }; >>>> >>>> final List> tuples = >>>> ImmutableList.> of(); >>>>
Re: Serial batching with Spark Streaming
09:11 1521 011_words_... >>> 09:16 11745 012_lines_... >>> 09:16 1530 012_words_... >>> 09:21 12069 013_lines_... >>> 09:22 1656 013_words_... >>> 09:27 10692 014_lines_... >>> 09:27 1665 014_words_... >>> 09:32 10449 015_lines_... >>> 09:32 1791 015_words_... >>> 09:37 11178 016_lines_... >>> 09:37 1800 016_words_... >>> 09:45 17496 017_lines_... >>> 09:45 1926 017_words_... >>> 09:55 22032 018_lines_... >>> 09:56 2061 018_words_... >>> 10:05 21951 019_lines_... >>> 10:06 2196 019_words_... >>> 10:15 21870 020_lines_... >>> 10:16 2322 020_words_... >>> 10:25 21303 021_lines_... >>> 10:26 2340 021_words_... >>> >>> >>> final SparkConf conf = new >>> SparkConf().setMaster("local[4]").setAppName("WordCount"); >>> try (final JavaStreamingContext context = new >>> JavaStreamingContext(conf, Durations.seconds(10))) { >>> >>> context.checkpoint("/tmp/checkpoint"); >>> >>> final JavaDStream lines = context.union( >>> context.receiverStream(new GeneratorReceiver()), >>> ImmutableList.of( >>> context.receiverStream(new GeneratorReceiver()), >>> context.receiverStream(new GeneratorReceiver(; >>> >>> lines.print(); >>> >>> final Accumulator lineRddIndex = >>> context.sparkContext().accumulator(0); >>> lines.foreachRDD( rdd -> { >>> lineRddIndex.add(1); >>> final String prefix = "/tmp/" + String.format("%03d", >>> lineRddIndex.localValue()) + "_lines_"; >>> try (final PrintStream out = new PrintStream(prefix + >>> UUID.randomUUID())) { >>> rdd.collect().forEach(s -> out.println(s)); >>> } >>> return null; >>> }); >>> >>> final JavaDStream words = >>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>> final JavaPairDStream pairs = >>> words.mapToPair(s -> new Tuple2(s, 1)); >>> final JavaPairDStream wordCounts = >>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>> >>> final Accumulator sleep = >>> context.sparkContext().accumulator(0); >>> final JavaPairDStream wordCounts2 = >>> JavaPairDStream.fromJavaDStream( >>> wordCounts.transform( (rdd) -> { >>> sleep.add(1); >>> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); >>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >>> rdd.classTag()); >>> })); >>> >>> final Function2, Optional, >>> Optional> updateFunction = >>> (values, state) -> { >>> Integer newSum = state.or(0); >>> for (final Integer value : values) { >>> newSum += value; >>> } >>> return Optional.of(newSum); >>> }; >>> >>> final List> tuples = >>> ImmutableList.> of(); >>> final JavaPairRDD initialRDD = >>> context.sparkContext().parallelizePairs(tuples); >>> >>> final JavaPairDStream wordCountsState = >>> wordCounts2.updateStateByKey( >>> updateFunction, >>> new >>> HashPartitioner(context.sparkContext().defaultParallelism()), >>> initialRDD); >>> >>> wordCountsState.print(); >>> >>> final Accumulator rddIndex = >>> context.sparkContext().accumulator(0); >>> wordCountsState.foreachRDD( rdd -> { >>> rddIndex.add(1); >>> final String prefix = "/tmp/" + String.format("%03d", >>> rddIndex.localValue()) + "_words_"; >>> try (final PrintStream out = new PrintStream(prefix + >>> UUID.randomUUID())) { >>> rdd.collect().forEach(s -> out.println(s)); >>> } >>> return null; >>> }); >>> >>> context.start(); >>> context.awaitTermination(); >>> } >>> >>> >>> On 17 June 2015 at 17:25, Tathagata Das wrote: >>> > The default behavior should b
Re: Serial batching with Spark Streaming
ckpoint"); >> >> final JavaDStream lines = context.union( >> context.receiverStream(new GeneratorReceiver()), >> ImmutableList.of( >> context.receiverStream(new GeneratorReceiver()), >> context.receiverStream(new GeneratorReceiver(; >> >> lines.print(); >> >> final Accumulator lineRddIndex = >> context.sparkContext().accumulator(0); >> lines.foreachRDD( rdd -> { >> lineRddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> lineRddIndex.localValue()) + "_lines_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> final JavaDStream words = >> lines.flatMap(x -> Arrays.asList(x.split(" "))); >> final JavaPairDStream pairs = >> words.mapToPair(s -> new Tuple2(s, 1)); >> final JavaPairDStream wordCounts = >> pairs.reduceByKey((i1, i2) -> i1 + i2); >> >> final Accumulator sleep = >> context.sparkContext().accumulator(0); >> final JavaPairDStream wordCounts2 = >> JavaPairDStream.fromJavaDStream( >> wordCounts.transform( (rdd) -> { >> sleep.add(1); >> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); >> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >> rdd.classTag()); >> })); >> >> final Function2, Optional, >> Optional> updateFunction = >> (values, state) -> { >> Integer newSum = state.or(0); >> for (final Integer value : values) { >> newSum += value; >> } >> return Optional.of(newSum); >> }; >> >> final List> tuples = >> ImmutableList.> of(); >> final JavaPairRDD initialRDD = >> context.sparkContext().parallelizePairs(tuples); >> >> final JavaPairDStream wordCountsState = >> wordCounts2.updateStateByKey( >> updateFunction, >> new >> HashPartitioner(context.sparkContext().defaultParallelism()), >> initialRDD); >> >> wordCountsState.print(); >> >> final Accumulator rddIndex = >> context.sparkContext().accumulator(0); >> wordCountsState.foreachRDD( rdd -> { >> rddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> rddIndex.localValue()) + "_words_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> context.start(); >> context.awaitTermination(); >> } >> >> >> On 17 June 2015 at 17:25, Tathagata Das wrote: >> > The default behavior should be that batch X + 1 starts processing only >> after >> > batch X completes. If you are using Spark 1.4.0, could you show us a >> > screenshot of the streaming tab, especially the list of batches? And >> could >> > you also tell us if you are setting any SparkConf configurations? >> > >> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia >> wrote: >> >> >> >> Is it possible to achieve serial batching with Spark Streaming? >> >> >> >> Example: >> >> >> >> I configure the Streaming Context for creating a batch every 3 seconds. >> >> >> >> Processing of the batch #2 takes longer than 3 seconds and creates a >> >> backlog of batches: >> >> >> >> batch #1 takes 2s >> >> batch #2 takes 10s >> >> batch #3 takes 2s >> >> batch #4 takes 2s >> >> >> >> Whet testing locally, it seems that processing of multiple batches is >> >> finished at the same time: >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 12s (processed in parallel) >> >> batch #4 finished at 15s >> >> >> >> How can I delay processing of the next batch, so that is processed >> >> only after processing of the previous batch has been completed? >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 14s (processed serially) >> >> batch #4 finished at 16s >> >> >> >> I want to perform a transformation for every key only once in a given >> >> period of time (e.g. batch duration). I find all unique keys in a >> >> batch and perform the transformation on each key. To ensure that the >> >> transformation is done for every key only once, only one batch can be >> >> processed at a time. At the same time, I want that single batch to be >> >> processed in parallel. >> >> >> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >> >> stream = context.receiverStream(...); >> >> stream >> >> .reduceByKey(...) >> >> .transform(...) >> >> .foreachRDD(output); >> >> >> >> Any ideas or pointers are very welcome. >> >> >> >> Thanks! >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Serial batching with Spark Streaming
context.checkpoint("/tmp/checkpoint"); >> >> final JavaDStream lines = context.union( >> context.receiverStream(new GeneratorReceiver()), >> ImmutableList.of( >> context.receiverStream(new GeneratorReceiver()), >> context.receiverStream(new GeneratorReceiver(; >> >> lines.print(); >> >> final Accumulator lineRddIndex = >> context.sparkContext().accumulator(0); >> lines.foreachRDD( rdd -> { >> lineRddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> lineRddIndex.localValue()) + "_lines_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> final JavaDStream words = >> lines.flatMap(x -> Arrays.asList(x.split(" "))); >> final JavaPairDStream pairs = >> words.mapToPair(s -> new Tuple2(s, 1)); >> final JavaPairDStream wordCounts = >> pairs.reduceByKey((i1, i2) -> i1 + i2); >> >> final Accumulator sleep = >> context.sparkContext().accumulator(0); >> final JavaPairDStream wordCounts2 = >> JavaPairDStream.fromJavaDStream( >> wordCounts.transform( (rdd) -> { >> sleep.add(1); >> Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); >> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >> rdd.classTag()); >> })); >> >> final Function2, Optional, >> Optional> updateFunction = >> (values, state) -> { >> Integer newSum = state.or(0); >> for (final Integer value : values) { >> newSum += value; >> } >> return Optional.of(newSum); >> }; >> >> final List> tuples = >> ImmutableList.> of(); >> final JavaPairRDD initialRDD = >> context.sparkContext().parallelizePairs(tuples); >> >> final JavaPairDStream wordCountsState = >> wordCounts2.updateStateByKey( >> updateFunction, >> new >> HashPartitioner(context.sparkContext().defaultParallelism()), >> initialRDD); >> >> wordCountsState.print(); >> >> final Accumulator rddIndex = >> context.sparkContext().accumulator(0); >> wordCountsState.foreachRDD( rdd -> { >> rddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> rddIndex.localValue()) + "_words_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> context.start(); >> context.awaitTermination(); >> } >> >> >> On 17 June 2015 at 17:25, Tathagata Das wrote: >> > The default behavior should be that batch X + 1 starts processing only >> > after >> > batch X completes. If you are using Spark 1.4.0, could you show us a >> > screenshot of the streaming tab, especially the list of batches? And >> > could >> > you also tell us if you are setting any SparkConf configurations? >> > >> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia >> > wrote: >> >> >> >> Is it possible to achieve serial batching with Spark Streaming? >> >> >> >> Example: >> >> >> >> I configure the Streaming Context for creating a batch every 3 seconds. >> >> >> >> Processing of the batch #2 takes longer than 3 seconds and creates a >> >> backlog of batches: >> >> >> >> batch #1 takes 2s >> >> batch #2 takes 10s >> >> batch #3 takes 2s >> >> batch #4 takes 2s >> >> >> >> Whet testing locally, it seems that processing of multiple batches is >> >> finished at the same time: >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 12s (processed in parallel) >> >> batch #4 finished at 15s >> >> >> >> How can I delay processing of the next batch, so that is processed >> >> only after processing of the previous batch has been completed? >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 14s (processed serially) >> >> batch #4 finished at 16s >> >> >> >> I want to perform a transformation for every key only once in a given >> >> period of time (e.g. batch duration). I find all unique keys in a >> >> batch and perform the transformation on each key. To ensure that the >> >> transformation is done for every key only once, only one batch can be >> >> processed at a time. At the same time, I want that single batch to be >> >> processed in parallel. >> >> >> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >> >> stream = context.receiverStream(...); >> >> stream >> >> .reduceByKey(...) >> >> .transform(...) >> >> .foreachRDD(output); >> >> >> >> Any ideas or pointers are very welcome. >> >> >> >> Thanks! >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serial batching with Spark Streaming
llect().forEach(s -> out.println(s)); > } > return null; > }); > > final JavaDStream words = > lines.flatMap(x -> Arrays.asList(x.split(" "))); > final JavaPairDStream pairs = > words.mapToPair(s -> new Tuple2(s, 1)); > final JavaPairDStream wordCounts = > pairs.reduceByKey((i1, i2) -> i1 + i2); > > final Accumulator sleep = > context.sparkContext().accumulator(0); > final JavaPairDStream wordCounts2 = > JavaPairDStream.fromJavaDStream( > wordCounts.transform( (rdd) -> { > sleep.add(1); > Thread.sleep(sleep.localValue() < 6 ? 2 : 5000); > return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag()); > })); > > final Function2, Optional, > Optional> updateFunction = > (values, state) -> { > Integer newSum = state.or(0); > for (final Integer value : values) { > newSum += value; > } > return Optional.of(newSum); > }; > > final List> tuples = > ImmutableList.> of(); > final JavaPairRDD initialRDD = > context.sparkContext().parallelizePairs(tuples); > > final JavaPairDStream wordCountsState = > wordCounts2.updateStateByKey( > updateFunction, > new > HashPartitioner(context.sparkContext().defaultParallelism()), > initialRDD); > > wordCountsState.print(); > > final Accumulator rddIndex = > context.sparkContext().accumulator(0); > wordCountsState.foreachRDD( rdd -> { > rddIndex.add(1); > final String prefix = "/tmp/" + String.format("%03d", > rddIndex.localValue()) + "_words_"; > try (final PrintStream out = new PrintStream(prefix + > UUID.randomUUID())) { > rdd.collect().forEach(s -> out.println(s)); > } > return null; > }); > > context.start(); > context.awaitTermination(); > } > > > On 17 June 2015 at 17:25, Tathagata Das wrote: > > The default behavior should be that batch X + 1 starts processing only > after > > batch X completes. If you are using Spark 1.4.0, could you show us a > > screenshot of the streaming tab, especially the list of batches? And > could > > you also tell us if you are setting any SparkConf configurations? > > > > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia > wrote: > >> > >> Is it possible to achieve serial batching with Spark Streaming? > >> > >> Example: > >> > >> I configure the Streaming Context for creating a batch every 3 seconds. > >> > >> Processing of the batch #2 takes longer than 3 seconds and creates a > >> backlog of batches: > >> > >> batch #1 takes 2s > >> batch #2 takes 10s > >> batch #3 takes 2s > >> batch #4 takes 2s > >> > >> Whet testing locally, it seems that processing of multiple batches is > >> finished at the same time: > >> > >> batch #1 finished at 2s > >> batch #2 finished at 12s > >> batch #3 finished at 12s (processed in parallel) > >> batch #4 finished at 15s > >> > >> How can I delay processing of the next batch, so that is processed > >> only after processing of the previous batch has been completed? > >> > >> batch #1 finished at 2s > >> batch #2 finished at 12s > >> batch #3 finished at 14s (processed serially) > >> batch #4 finished at 16s > >> > >> I want to perform a transformation for every key only once in a given > >> period of time (e.g. batch duration). I find all unique keys in a > >> batch and perform the transformation on each key. To ensure that the > >> transformation is done for every key only once, only one batch can be > >> processed at a time. At the same time, I want that single batch to be > >> processed in parallel. > >> > >> context = new JavaStreamingContext(conf, Durations.seconds(10)); > >> stream = context.receiverStream(...); > >> stream > >> .reduceByKey(...) > >> .transform(...) > >> .foreachRDD(output); > >> > >> Any ideas or pointers are very welcome. > >> > >> Thanks! > >> > >> - > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Serial batching with Spark Streaming
batches? And could > you also tell us if you are setting any SparkConf configurations? > > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia wrote: >> >> Is it possible to achieve serial batching with Spark Streaming? >> >> Example: >> >> I configure the Streaming Context for creating a batch every 3 seconds. >> >> Processing of the batch #2 takes longer than 3 seconds and creates a >> backlog of batches: >> >> batch #1 takes 2s >> batch #2 takes 10s >> batch #3 takes 2s >> batch #4 takes 2s >> >> Whet testing locally, it seems that processing of multiple batches is >> finished at the same time: >> >> batch #1 finished at 2s >> batch #2 finished at 12s >> batch #3 finished at 12s (processed in parallel) >> batch #4 finished at 15s >> >> How can I delay processing of the next batch, so that is processed >> only after processing of the previous batch has been completed? >> >> batch #1 finished at 2s >> batch #2 finished at 12s >> batch #3 finished at 14s (processed serially) >> batch #4 finished at 16s >> >> I want to perform a transformation for every key only once in a given >> period of time (e.g. batch duration). I find all unique keys in a >> batch and perform the transformation on each key. To ensure that the >> transformation is done for every key only once, only one batch can be >> processed at a time. At the same time, I want that single batch to be >> processed in parallel. >> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >> stream = context.receiverStream(...); >> stream >> .reduceByKey(...) >> .transform(...) >> .foreachRDD(output); >> >> Any ideas or pointers are very welcome. >> >> Thanks! >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serial batching with Spark Streaming
The default behavior should be that batch X + 1 starts processing only after batch X completes. If you are using Spark 1.4.0, could you show us a screenshot of the streaming tab, especially the list of batches? And could you also tell us if you are setting any SparkConf configurations? On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia wrote: > Is it possible to achieve serial batching with Spark Streaming? > > Example: > > I configure the Streaming Context for creating a batch every 3 seconds. > > Processing of the batch #2 takes longer than 3 seconds and creates a > backlog of batches: > > batch #1 takes 2s > batch #2 takes 10s > batch #3 takes 2s > batch #4 takes 2s > > Whet testing locally, it seems that processing of multiple batches is > finished at the same time: > > batch #1 finished at 2s > batch #2 finished at 12s > batch #3 finished at 12s (processed in parallel) > batch #4 finished at 15s > > How can I delay processing of the next batch, so that is processed > only after processing of the previous batch has been completed? > > batch #1 finished at 2s > batch #2 finished at 12s > batch #3 finished at 14s (processed serially) > batch #4 finished at 16s > > I want to perform a transformation for every key only once in a given > period of time (e.g. batch duration). I find all unique keys in a > batch and perform the transformation on each key. To ensure that the > transformation is done for every key only once, only one batch can be > processed at a time. At the same time, I want that single batch to be > processed in parallel. > > context = new JavaStreamingContext(conf, Durations.seconds(10)); > stream = context.receiverStream(...); > stream > .reduceByKey(...) > .transform(...) > .foreachRDD(output); > > Any ideas or pointers are very welcome. > > Thanks! > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Serial batching with Spark Streaming
Is it possible to achieve serial batching with Spark Streaming? Example: I configure the Streaming Context for creating a batch every 3 seconds. Processing of the batch #2 takes longer than 3 seconds and creates a backlog of batches: batch #1 takes 2s batch #2 takes 10s batch #3 takes 2s batch #4 takes 2s Whet testing locally, it seems that processing of multiple batches is finished at the same time: batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 12s (processed in parallel) batch #4 finished at 15s How can I delay processing of the next batch, so that is processed only after processing of the previous batch has been completed? batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 14s (processed serially) batch #4 finished at 16s I want to perform a transformation for every key only once in a given period of time (e.g. batch duration). I find all unique keys in a batch and perform the transformation on each key. To ensure that the transformation is done for every key only once, only one batch can be processed at a time. At the same time, I want that single batch to be processed in parallel. context = new JavaStreamingContext(conf, Durations.seconds(10)); stream = context.receiverStream(...); stream .reduceByKey(...) .transform(...) .foreachRDD(output); Any ideas or pointers are very welcome. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org