Ok, with:

.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.receiver.maxRate", "10000")
.set("spark.streaming.kafka.maxRatePerPartition", "10000")

I have something like

***************************************************************************
Processing time: 5626
Expected time: 10000
Processed messages: 100000
Message example: {"message": 950002,
"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
Recovered json:
{"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}

That is yes, it works but throughput is much less than without limitations
because of this is an absolute upper limit. And time of processing is half
of available.

Regarding Spark 2.0 structured streaming I will look it some later. Now I
don't know how to strictly measure throughput and latency of this high
level API. My aim now is to compare streaming processors.


2016-07-06 17:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> The configuration you set is spark.streaming.receiver.maxRate.  The
> direct stream is not a receiver.  As I said in my first message in
> this thread, and as the pages at
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> also say, use maxRatePerPartition for the direct stream.
>
> Bottom line, if you have more information than your system can process
> in X amount of time, after X amount of time you can either give the
> wrong answer, or take longer to process.  Flink can't violate the laws
> of physics.  If the tradeoffs that Flink make are better for your use
> case than the tradeoffs that DStreams make, you may be better off
> using Flink (or testing out spark 2.0 structured streaming, although
> there's no kafka integration available for that yet)
>
> On Wed, Jul 6, 2016 at 10:25 AM, rss rss <rssde...@gmail.com> wrote:
> > ok, thanks. I tried  to set minimum max rate for beginning. However in
> > general I don't know initial throughput. BTW it would be very useful to
> > explain it in
> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
> >
> > And really with
> >
> > .set("spark.streaming.backpressure.enabled","true")
> > .set("spark.streaming.receiver.maxRate", "10000")
> >
> > I have same problem:
> >
> ***************************************************************************
> > Processing time: 36994
> > Expected time: 10000
> > Processed messages: 3015830
> > Message example: {"message": 1,
> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> > Recovered json:
> {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >
> >
> > Regarding auto.offset.reset smallest, now it is because of a test and I
> want
> > to get same messages for each run. But in any case I expect to read all
> new
> > messages from queue.
> >
> > Regarding backpressure detection. What is to do then a process time is
> much
> > more then input rate? Now I see growing time of processing instead of
> stable
> > 10 second and decreasing number of processed messages. Where is a limit
> of
> > of backpressure algorithm?
> >
> > Regarding Flink. I don't know how works core of Flink but you can check
> self
> > that Flink will strictly terminate processing of messages by time.
> Deviation
> > of the time window from 10 seconds to several minutes is impossible.
> >
> > PS: I prepared this example to make possible easy observe the problem and
> > fix it if it is a bug. For me it is obvious. May I ask you to be near to
> > this simple source code? In other case I have to think that this is a
> > technical limitation of Spark to work with unstable data flows.
> >
> > Cheers
> >
> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> The direct stream determines batch sizes on the driver, in advance of
> >> processing.  If you haven't specified a maximum batch size, how would
> >> you suggest the backpressure code determine how to limit the first
> >> batch?  It has no data on throughput until at least one batch is
> >> completed.
> >>
> >> Again, this is why I'm saying test by producing messages into kafka at
> >> a rate comparable to production, not loading a ton of messages in and
> >> starting from auto.offset.reset smallest.
> >>
> >> Even if you're unwilling to take that advice for some reason, and
> >> unwilling to empirically determine a reasonable maximum partition
> >> size, you should be able to estimate an upper bound such that the
> >> first batch does not encompass your entire kafka retention.
> >> Backpressure will kick in once it has some information to work with.
> >>
> >> On Wed, Jul 6, 2016 at 8:45 AM, rss rss <rssde...@gmail.com> wrote:
> >> > Hello,
> >> >
> >> >   thanks, I tried to
> .set("spark.streaming.backpressure.enabled","true")
> >> > but
> >> > result is negative. Therefore I have prepared small test
> >> > https://github.com/rssdev10/spark-kafka-streaming
> >> >
> >> >   How to run:
> >> >   git clone https://github.com/rssdev10/spark-kafka-streaming.git
> >> >   cd spark-kafka-streaming
> >> >
> >> >   # downloads kafka and zookeeper
> >> >   ./gradlew setup
> >> >
> >> >   # run zookeeper, kafka, and run messages generation
> >> >   ./gradlew test_data_prepare
> >> >
> >> > And in other console just run:
> >> >    ./gradlew test_spark
> >> >
> >> > It is easy to break data generation by CTRL-C. And continue by same
> >> > command
> >> > ./gradlew test_data_prepare
> >> >
> >> > To stop all run:
> >> >   ./gradlew stop_all
> >> >
> >> > Spark test must generate messages each 10 seconds like:
> >> >
> >> >
> ***************************************************************************
> >> > Processing time: 33477
> >> > Expected time: 10000
> >> > Processed messages: 2897866
> >> > Message example: {"message": 1,
> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> > Recovered json:
> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> >
> >> >
> >> > message is number of fist message in the window. Time values are in
> >> > milliseconds.
> >> >
> >> > Brief results:
> >> >
> >> > Spark always reads all messaged from Kafka after first connection
> >> > independently on dstream or window size time. It looks like a bug.
> >> > When processing speed in Spark's app is near to speed of data
> generation
> >> > all
> >> > is ok.
> >> > I added delayFactor in
> >> >
> >> >
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> >> > to emulate slow processing. And streaming process is in degradation.
> >> > When
> >> > delayFactor=0 it looks like stable process.
> >> >
> >> >
> >> > Cheers
> >> >
> >> >
> >> > 2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >> >>
> >> >> Test by producing messages into kafka at a rate comparable to what
> you
> >> >> expect in production.
> >> >>
> >> >> Test with backpressure turned on, it doesn't require you to specify a
> >> >> fixed limit on number of messages and will do its best to maintain
> >> >> batch timing.  Or you could empirically determine a reasonable fixed
> >> >> limit.
> >> >>
> >> >> Setting up a kafka topic with way more static messages in it than
> your
> >> >> system can handle in one batch, and then starting a stream from the
> >> >> beginning of it without turning on backpressure or limiting the
> number
> >> >> of messages... isn't a reasonable way to test steady state
> >> >> performance.  Flink can't magically give you a correct answer under
> >> >> those circumstances either.
> >> >>
> >> >> On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com> wrote:
> >> >> > Hi, thanks.
> >> >> >
> >> >> >    I know about possibility to limit number of messages. But the
> >> >> > problem
> >> >> > is
> >> >> > I don't know number of messages which the system able to process.
> It
> >> >> > depends
> >> >> > on data. The example is very simple. I need a strict response after
> >> >> > specified time. Something like soft real time. In case of Flink I
> >> >> > able
> >> >> > to
> >> >> > setup strict time of processing like this:
> >> >> >
> >> >> > KeyedStream<Event, Integer> keyed =
> >> >> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
> >> >> > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
> >> >> > keyed.timeWindow(
> >> >> > Time.seconds(10) );
> >> >> > DataStream<Aggregator> uniqUsers =
> >> >> > uniq.trigger(ProcessingTimeTrigger.create())
> >> >> >         .fold(new Aggregator(), new FoldFunction<Event,
> Aggregator>()
> >> >> > {
> >> >> >             @Override
> >> >> >             public Aggregator fold(Aggregator accumulator, Event
> >> >> > value)
> >> >> > throws Exception {
> >> >> >                 accumulator.add(event.userId);
> >> >> >                 return accumulator;
> >> >> >             }
> >> >> >         });
> >> >> >
> >> >> > uniq.print();
> >> >> >
> >> >> > And I can see results every 10 seconds independently on input data
> >> >> > stream.
> >> >> > Is it possible something in Spark?
> >> >> >
> >> >> > Regarding zeros in my example the reason I have prepared message
> >> >> > queue
> >> >> > in
> >> >> > Kafka for the tests. If I add some messages after I able to see new
> >> >> > messages. But in any case I need first response after 10 second.
> Not
> >> >> > minutes
> >> >> > or hours after.
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >> >> >>
> >> >> >> If you're talking about limiting the number of messages per batch
> to
> >> >> >> try and keep from exceeding batch time, see
> >> >> >>
> >> >> >> http://spark.apache.org/docs/latest/configuration.html
> >> >> >>
> >> >> >> look for backpressure and maxRatePerParition
> >> >> >>
> >> >> >>
> >> >> >> But if you're only seeing zeros after your job runs for a minute,
> it
> >> >> >> sounds like something else is wrong.
> >> >> >>
> >> >> >>
> >> >> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com>
> wrote:
> >> >> >> > Hello,
> >> >> >> >
> >> >> >> >   I'm trying to organize processing of messages from Kafka. And
> >> >> >> > there
> >> >> >> > is
> >> >> >> > a
> >> >> >> > typical case when a number of messages in kafka's queue is more
> >> >> >> > then
> >> >> >> > Spark
> >> >> >> > app's possibilities to process. But I need a strong time limit
> to
> >> >> >> > prepare
> >> >> >> > result for at least for a part of data.
> >> >> >> >
> >> >> >> > Code example:
> >> >> >> >
> >> >> >> >         SparkConf sparkConf = new SparkConf()
> >> >> >> >                 .setAppName("Spark")
> >> >> >> >                 .setMaster("local");
> >> >> >> >
> >> >> >> >         JavaStreamingContext jssc = new
> >> >> >> > JavaStreamingContext(sparkConf,
> >> >> >> > Milliseconds.apply(5000));
> >> >> >> >
> >> >> >> >         jssc.checkpoint("/tmp/spark_checkpoint");
> >> >> >> >
> >> >> >> >         Set<String> topicMap = new
> >> >> >> > HashSet<>(Arrays.asList(topicList.split(",")));
> >> >> >> >         Map<String, String> kafkaParams = new HashMap<String,
> >> >> >> > String>()
> >> >> >> > {
> >> >> >> >             {
> >> >> >> >                 put("metadata.broker.list", bootstrapServers);
> >> >> >> >                 put("auto.offset.reset", "smallest");
> >> >> >> >             }
> >> >> >> >         };
> >> >> >> >
> >> >> >> >         JavaPairInputDStream<String, String> messages =
> >> >> >> >                 KafkaUtils.createDirectStream(jssc,
> >> >> >> >                         String.class,
> >> >> >> >                         String.class,
> >> >> >> >                         StringDecoder.class,
> >> >> >> >                         StringDecoder.class,
> >> >> >> >                         kafkaParams,
> >> >> >> >                         topicMap);
> >> >> >> >
> >> >> >> >         messages.countByWindow(Seconds.apply(10),
> >> >> >> > Milliseconds.apply(5000))
> >> >> >> >                 .map(x -> {System.out.println(x); return x;})
> >> >> >> >                 .dstream().saveAsTextFiles("/tmp/spark",
> >> >> >> > "spark-streaming");
> >> >> >> >
> >> >> >> >
> >> >> >> >   I need to see a result of window operation each 10 seconds
> (this
> >> >> >> > is
> >> >> >> > only
> >> >> >> > simplest example). But really with my test data ~10M messages I
> >> >> >> > have
> >> >> >> > first
> >> >> >> > result a minute after and further I see only zeros. Is a way to
> >> >> >> > limit
> >> >> >> > processing time to guarantee a response in specified time like
> >> >> >> > Apache
> >> >> >> > Flink's triggers?
> >> >> >> >
> >> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Reply via email to