The direct stream determines batch sizes on the driver, in advance of
processing.  If you haven't specified a maximum batch size, how would
you suggest the backpressure code determine how to limit the first
batch?  It has no data on throughput until at least one batch is
completed.

Again, this is why I'm saying test by producing messages into kafka at
a rate comparable to production, not loading a ton of messages in and
starting from auto.offset.reset smallest.

Even if you're unwilling to take that advice for some reason, and
unwilling to empirically determine a reasonable maximum partition
size, you should be able to estimate an upper bound such that the
first batch does not encompass your entire kafka retention.
Backpressure will kick in once it has some information to work with.

On Wed, Jul 6, 2016 at 8:45 AM, rss rss <rssde...@gmail.com> wrote:
> Hello,
>
>   thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
> result is negative. Therefore I have prepared small test
> https://github.com/rssdev10/spark-kafka-streaming
>
>   How to run:
>   git clone https://github.com/rssdev10/spark-kafka-streaming.git
>   cd spark-kafka-streaming
>
>   # downloads kafka and zookeeper
>   ./gradlew setup
>
>   # run zookeeper, kafka, and run messages generation
>   ./gradlew test_data_prepare
>
> And in other console just run:
>    ./gradlew test_spark
>
> It is easy to break data generation by CTRL-C. And continue by same command
> ./gradlew test_data_prepare
>
> To stop all run:
>   ./gradlew stop_all
>
> Spark test must generate messages each 10 seconds like:
> ***************************************************************************
> Processing time: 33477
> Expected time: 10000
> Processed messages: 2897866
> Message example: {"message": 1,
> "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>
>
> message is number of fist message in the window. Time values are in
> milliseconds.
>
> Brief results:
>
> Spark always reads all messaged from Kafka after first connection
> independently on dstream or window size time. It looks like a bug.
> When processing speed in Spark's app is near to speed of data generation all
> is ok.
> I added delayFactor in
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> to emulate slow processing. And streaming process is in degradation. When
> delayFactor=0 it looks like stable process.
>
>
> Cheers
>
>
> 2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> Test by producing messages into kafka at a rate comparable to what you
>> expect in production.
>>
>> Test with backpressure turned on, it doesn't require you to specify a
>> fixed limit on number of messages and will do its best to maintain
>> batch timing.  Or you could empirically determine a reasonable fixed
>> limit.
>>
>> Setting up a kafka topic with way more static messages in it than your
>> system can handle in one batch, and then starting a stream from the
>> beginning of it without turning on backpressure or limiting the number
>> of messages... isn't a reasonable way to test steady state
>> performance.  Flink can't magically give you a correct answer under
>> those circumstances either.
>>
>> On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com> wrote:
>> > Hi, thanks.
>> >
>> >    I know about possibility to limit number of messages. But the problem
>> > is
>> > I don't know number of messages which the system able to process. It
>> > depends
>> > on data. The example is very simple. I need a strict response after
>> > specified time. Something like soft real time. In case of Flink I able
>> > to
>> > setup strict time of processing like this:
>> >
>> > KeyedStream<Event, Integer> keyed =
>> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
>> > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
>> > keyed.timeWindow(
>> > Time.seconds(10) );
>> > DataStream<Aggregator> uniqUsers =
>> > uniq.trigger(ProcessingTimeTrigger.create())
>> >         .fold(new Aggregator(), new FoldFunction<Event, Aggregator>() {
>> >             @Override
>> >             public Aggregator fold(Aggregator accumulator, Event value)
>> > throws Exception {
>> >                 accumulator.add(event.userId);
>> >                 return accumulator;
>> >             }
>> >         });
>> >
>> > uniq.print();
>> >
>> > And I can see results every 10 seconds independently on input data
>> > stream.
>> > Is it possible something in Spark?
>> >
>> > Regarding zeros in my example the reason I have prepared message queue
>> > in
>> > Kafka for the tests. If I add some messages after I able to see new
>> > messages. But in any case I need first response after 10 second. Not
>> > minutes
>> > or hours after.
>> >
>> > Thanks.
>> >
>> >
>> >
>> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >>
>> >> If you're talking about limiting the number of messages per batch to
>> >> try and keep from exceeding batch time, see
>> >>
>> >> http://spark.apache.org/docs/latest/configuration.html
>> >>
>> >> look for backpressure and maxRatePerParition
>> >>
>> >>
>> >> But if you're only seeing zeros after your job runs for a minute, it
>> >> sounds like something else is wrong.
>> >>
>> >>
>> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com> wrote:
>> >> > Hello,
>> >> >
>> >> >   I'm trying to organize processing of messages from Kafka. And there
>> >> > is
>> >> > a
>> >> > typical case when a number of messages in kafka's queue is more then
>> >> > Spark
>> >> > app's possibilities to process. But I need a strong time limit to
>> >> > prepare
>> >> > result for at least for a part of data.
>> >> >
>> >> > Code example:
>> >> >
>> >> >         SparkConf sparkConf = new SparkConf()
>> >> >                 .setAppName("Spark")
>> >> >                 .setMaster("local");
>> >> >
>> >> >         JavaStreamingContext jssc = new
>> >> > JavaStreamingContext(sparkConf,
>> >> > Milliseconds.apply(5000));
>> >> >
>> >> >         jssc.checkpoint("/tmp/spark_checkpoint");
>> >> >
>> >> >         Set<String> topicMap = new
>> >> > HashSet<>(Arrays.asList(topicList.split(",")));
>> >> >         Map<String, String> kafkaParams = new HashMap<String,
>> >> > String>()
>> >> > {
>> >> >             {
>> >> >                 put("metadata.broker.list", bootstrapServers);
>> >> >                 put("auto.offset.reset", "smallest");
>> >> >             }
>> >> >         };
>> >> >
>> >> >         JavaPairInputDStream<String, String> messages =
>> >> >                 KafkaUtils.createDirectStream(jssc,
>> >> >                         String.class,
>> >> >                         String.class,
>> >> >                         StringDecoder.class,
>> >> >                         StringDecoder.class,
>> >> >                         kafkaParams,
>> >> >                         topicMap);
>> >> >
>> >> >         messages.countByWindow(Seconds.apply(10),
>> >> > Milliseconds.apply(5000))
>> >> >                 .map(x -> {System.out.println(x); return x;})
>> >> >                 .dstream().saveAsTextFiles("/tmp/spark",
>> >> > "spark-streaming");
>> >> >
>> >> >
>> >> >   I need to see a result of window operation each 10 seconds (this is
>> >> > only
>> >> > simplest example). But really with my test data ~10M messages I have
>> >> > first
>> >> > result a minute after and further I see only zeros. Is a way to limit
>> >> > processing time to guarantee a response in specified time like Apache
>> >> > Flink's triggers?
>> >> >
>> >> > Thanks.
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to