Re: Spark streaming. Strict discretizing by time

2016-07-07 Thread rss rss
Hi,

  I changed auto.offset.reset to largest. The result 30, 50, 40, 40, 35, 30
seconds... Instead of 10 seconds. It looks like attempt to react on
backpressure but very slow. In any case it is far from any real time tasks
including soft real time. And ok, I agreed with Spark usage with data flows
without peaks and with hot reserves of hardware.

  If it is interesting for you I added Flink test with same logic. Just run
it by ./gradlew test_flink. Just as a reference.
  https://github.com/rssdev10/spark-kafka-streaming

Cheers

2016-07-06 20:12 GMT+02:00 Cody Koeninger :

> > Yes and I sent you results. It is appropriate only with known parameters
> of input data stream.
>
> No, as far as I can tell from your posts in this thread and your
> linked project, you only tested with auto.offset.reset smallest and a
> large backlog.  That's not what I advised you to do.  Don't draw
> inaccurate conclusions about Spark DStreams from that test.  The
> reason you need to specify maxRatePerPartition is because you're
> starting with a large backlog and thus a large first batch.  If you
> were testing an ongoing stream with auto.offset.reset largest,
> backpressure alone should be sufficient.
>
>
>
> On Wed, Jul 6, 2016 at 12:23 PM, rss rss  wrote:
> >> If you aren't processing messages as fast as you receive them, you're
> >> going to run out of kafka retention regardless of whether you're using
> >> Spark or Flink.  Again, physics.  It's just a question of what
> >> compromises you choose.
> >
> >
> > Yes. I wrote about it. But in case of Flink you will have output strictly
> > after specified time. If it is impossible to process 1000 messages per 1
> > second but possible process 500, then Flink makes an output for 500. If
> only
> > 1 message processed, Flink produced an output for one only but after 1
> > second. At the same time Spark processes all 1000 but much longer that 1
> > second in this case.
> >
> >>  that's what backpressure
> >> and maxRatePerPartition are for.  As long as those are set reasonably,
> >> you'll have a reasonably fixed output interval.  Have you actually
> >> tested this in the way I suggested?
> >
> >
> > Yes and I sent you results. It is appropriate only with known parameters
> of
> > input data stream. I'm not able to estimate bounds of Sparks usage in
> > general. And I'm not about it. I'm about Spark has these limitations. And
> > most problem is when a calculation time depends on input data. That is if
> > you want to have a stable period of output data generation you have to
> use
> > computational system with free resources in hot reserve.
> >
> >  In any case thanks, now I understand how to use Spark.
> >
> > PS: I will continue work with Spark but to minimize emails stream I plan
> to
> > unsubscribe from this mail list
> >
> > 2016-07-06 18:55 GMT+02:00 Cody Koeninger :
> >>
> >> If you aren't processing messages as fast as you receive them, you're
> >> going to run out of kafka retention regardless of whether you're using
> >> Spark or Flink.  Again, physics.  It's just a question of what
> >> compromises you choose.
> >>
> >> If by "growing of a processing window time of Spark" you mean a
> >> processing time that exceeds batch time... that's what backpressure
> >> and maxRatePerPartition are for.  As long as those are set reasonably,
> >> you'll have a reasonably fixed output interval.  Have you actually
> >> tested this in the way I suggested?
> >>
> >> On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
> >> > Ok, thanks. But really this is not full decision. In case of growing
> of
> >> > processing time I will have growing of window time. That is really
> with
> >> > Spark I have 2 points of a latency growing. First is a delay of
> >> > processing
> >> > of messages in Kafka queue due to physical limitation of a computer
> >> > system.
> >> > And second one is growing of a processing window time of Spark. In
> case
> >> > of
> >> > Flink there is only first point of delay but time intervals of output
> >> > data
> >> > are fixed. It is really looks like limitation of Spark. That is if
> >> > dataflow
> >> > is stable, all is ok. If there are peaks of loading more than
> >> > possibility of
> >> > computational system or data dependent time of calculation, Spark is
> not
> >> > able to provide a periodically stable results output. Sometimes this
> is
> >> > appropriate but sometime this is not appropriate.
> >> >
> >> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
> >> >>
> >> >> Then double the upper limit you have set until the processing time
> >> >> approaches the batch time.
> >> >>
> >> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> >> >> > Ok, with:
> >> >> >
> >> >> > .set("spark.streaming.backpressure.enabled","true")
> >> >> > .set("spark.streaming.receiver.maxRate", "1")
> >> >> > .set("spark.streaming.kafka.maxRatePerPartition", 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
> Yes and I sent you results. It is appropriate only with known parameters of 
> input data stream.

No, as far as I can tell from your posts in this thread and your
linked project, you only tested with auto.offset.reset smallest and a
large backlog.  That's not what I advised you to do.  Don't draw
inaccurate conclusions about Spark DStreams from that test.  The
reason you need to specify maxRatePerPartition is because you're
starting with a large backlog and thus a large first batch.  If you
were testing an ongoing stream with auto.offset.reset largest,
backpressure alone should be sufficient.



On Wed, Jul 6, 2016 at 12:23 PM, rss rss  wrote:
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>
>
> Yes. I wrote about it. But in case of Flink you will have output strictly
> after specified time. If it is impossible to process 1000 messages per 1
> second but possible process 500, then Flink makes an output for 500. If only
> 1 message processed, Flink produced an output for one only but after 1
> second. At the same time Spark processes all 1000 but much longer that 1
> second in this case.
>
>>  that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>
>
> Yes and I sent you results. It is appropriate only with known parameters of
> input data stream. I'm not able to estimate bounds of Sparks usage in
> general. And I'm not about it. I'm about Spark has these limitations. And
> most problem is when a calculation time depends on input data. That is if
> you want to have a stable period of output data generation you have to use
> computational system with free resources in hot reserve.
>
>  In any case thanks, now I understand how to use Spark.
>
> PS: I will continue work with Spark but to minimize emails stream I plan to
> unsubscribe from this mail list
>
> 2016-07-06 18:55 GMT+02:00 Cody Koeninger :
>>
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>>
>> If by "growing of a processing window time of Spark" you mean a
>> processing time that exceeds batch time... that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>>
>> On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
>> > Ok, thanks. But really this is not full decision. In case of growing of
>> > processing time I will have growing of window time. That is really with
>> > Spark I have 2 points of a latency growing. First is a delay of
>> > processing
>> > of messages in Kafka queue due to physical limitation of a computer
>> > system.
>> > And second one is growing of a processing window time of Spark. In case
>> > of
>> > Flink there is only first point of delay but time intervals of output
>> > data
>> > are fixed. It is really looks like limitation of Spark. That is if
>> > dataflow
>> > is stable, all is ok. If there are peaks of loading more than
>> > possibility of
>> > computational system or data dependent time of calculation, Spark is not
>> > able to provide a periodically stable results output. Sometimes this is
>> > appropriate but sometime this is not appropriate.
>> >
>> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
>> >>
>> >> Then double the upper limit you have set until the processing time
>> >> approaches the batch time.
>> >>
>> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
>> >> > Ok, with:
>> >> >
>> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> > .set("spark.streaming.receiver.maxRate", "1")
>> >> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
>> >> >
>> >> > I have something like
>> >> >
>> >> >
>> >> >
>> >> > ***
>> >> > Processing time: 5626
>> >> > Expected time: 1
>> >> > Processed messages: 10
>> >> > Message example: {"message": 950002,
>> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> > Recovered json:
>> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> >
>> >> > That is yes, it works but throughput is much less than without
>> >> > limitations
>> >> > because of this is an absolute upper limit. And time of processing is
>> >> > half
>> >> > of available.
>> >> >
>> >> > Regarding Spark 2.0 structured streaming I will look it some later.
>> >> > Now
>> >> > I
>> >> > don't know how to 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
>
> If you aren't processing messages as fast as you receive them, you're
> going to run out of kafka retention regardless of whether you're using
> Spark or Flink.  Again, physics.  It's just a question of what
> compromises you choose.


Yes. I wrote about it. But in case of Flink you will have output strictly
after specified time. If it is impossible to process 1000 messages per 1
second but possible process 500, then Flink makes an output for 500. If
only 1 message processed, Flink produced an output for one only but after 1
second. At the same time Spark processes all 1000 but much longer that 1
second in this case.

 that's what backpressure
> and maxRatePerPartition are for.  As long as those are set reasonably,
> you'll have a reasonably fixed output interval.  Have you actually
> tested this in the way I suggested?


Yes and I sent you results. It is appropriate only with known parameters of
input data stream. I'm not able to estimate bounds of Sparks usage in
general. And I'm not about it. I'm about Spark has these limitations. And
most problem is when a calculation time depends on input data. That is if
you want to have a stable period of output data generation you have to use
computational system with free resources in hot reserve.

 In any case thanks, now I understand how to use Spark.

PS: I will continue work with Spark but to minimize emails stream I plan to
unsubscribe from this mail list

2016-07-06 18:55 GMT+02:00 Cody Koeninger :

> If you aren't processing messages as fast as you receive them, you're
> going to run out of kafka retention regardless of whether you're using
> Spark or Flink.  Again, physics.  It's just a question of what
> compromises you choose.
>
> If by "growing of a processing window time of Spark" you mean a
> processing time that exceeds batch time... that's what backpressure
> and maxRatePerPartition are for.  As long as those are set reasonably,
> you'll have a reasonably fixed output interval.  Have you actually
> tested this in the way I suggested?
>
> On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
> > Ok, thanks. But really this is not full decision. In case of growing of
> > processing time I will have growing of window time. That is really with
> > Spark I have 2 points of a latency growing. First is a delay of
> processing
> > of messages in Kafka queue due to physical limitation of a computer
> system.
> > And second one is growing of a processing window time of Spark. In case
> of
> > Flink there is only first point of delay but time intervals of output
> data
> > are fixed. It is really looks like limitation of Spark. That is if
> dataflow
> > is stable, all is ok. If there are peaks of loading more than
> possibility of
> > computational system or data dependent time of calculation, Spark is not
> > able to provide a periodically stable results output. Sometimes this is
> > appropriate but sometime this is not appropriate.
> >
> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
> >>
> >> Then double the upper limit you have set until the processing time
> >> approaches the batch time.
> >>
> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> >> > Ok, with:
> >> >
> >> > .set("spark.streaming.backpressure.enabled","true")
> >> > .set("spark.streaming.receiver.maxRate", "1")
> >> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
> >> >
> >> > I have something like
> >> >
> >> >
> >> >
> ***
> >> > Processing time: 5626
> >> > Expected time: 1
> >> > Processed messages: 10
> >> > Message example: {"message": 950002,
> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >> > Recovered json:
> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >> >
> >> > That is yes, it works but throughput is much less than without
> >> > limitations
> >> > because of this is an absolute upper limit. And time of processing is
> >> > half
> >> > of available.
> >> >
> >> > Regarding Spark 2.0 structured streaming I will look it some later.
> Now
> >> > I
> >> > don't know how to strictly measure throughput and latency of this high
> >> > level
> >> > API. My aim now is to compare streaming processors.
> >> >
> >> >
> >> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
> >> >>
> >> >> The configuration you set is spark.streaming.receiver.maxRate.  The
> >> >> direct stream is not a receiver.  As I said in my first message in
> >> >> this thread, and as the pages at
> >> >>
> >> >>
> >> >>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> >> >> and
> >> >>
> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >> >> also say, use maxRatePerPartition for the direct stream.
> >> >>
> >> >> Bottom line, if you have more information than your system can
> process
> >> >> in X amount of 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
If you aren't processing messages as fast as you receive them, you're
going to run out of kafka retention regardless of whether you're using
Spark or Flink.  Again, physics.  It's just a question of what
compromises you choose.

If by "growing of a processing window time of Spark" you mean a
processing time that exceeds batch time... that's what backpressure
and maxRatePerPartition are for.  As long as those are set reasonably,
you'll have a reasonably fixed output interval.  Have you actually
tested this in the way I suggested?

On Wed, Jul 6, 2016 at 11:38 AM, rss rss  wrote:
> Ok, thanks. But really this is not full decision. In case of growing of
> processing time I will have growing of window time. That is really with
> Spark I have 2 points of a latency growing. First is a delay of processing
> of messages in Kafka queue due to physical limitation of a computer system.
> And second one is growing of a processing window time of Spark. In case of
> Flink there is only first point of delay but time intervals of output data
> are fixed. It is really looks like limitation of Spark. That is if dataflow
> is stable, all is ok. If there are peaks of loading more than possibility of
> computational system or data dependent time of calculation, Spark is not
> able to provide a periodically stable results output. Sometimes this is
> appropriate but sometime this is not appropriate.
>
> 2016-07-06 18:11 GMT+02:00 Cody Koeninger :
>>
>> Then double the upper limit you have set until the processing time
>> approaches the batch time.
>>
>> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
>> > Ok, with:
>> >
>> > .set("spark.streaming.backpressure.enabled","true")
>> > .set("spark.streaming.receiver.maxRate", "1")
>> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
>> >
>> > I have something like
>> >
>> >
>> > ***
>> > Processing time: 5626
>> > Expected time: 1
>> > Processed messages: 10
>> > Message example: {"message": 950002,
>> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> > Recovered json:
>> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >
>> > That is yes, it works but throughput is much less than without
>> > limitations
>> > because of this is an absolute upper limit. And time of processing is
>> > half
>> > of available.
>> >
>> > Regarding Spark 2.0 structured streaming I will look it some later. Now
>> > I
>> > don't know how to strictly measure throughput and latency of this high
>> > level
>> > API. My aim now is to compare streaming processors.
>> >
>> >
>> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
>> >>
>> >> The configuration you set is spark.streaming.receiver.maxRate.  The
>> >> direct stream is not a receiver.  As I said in my first message in
>> >> this thread, and as the pages at
>> >>
>> >>
>> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>> >> and
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >> also say, use maxRatePerPartition for the direct stream.
>> >>
>> >> Bottom line, if you have more information than your system can process
>> >> in X amount of time, after X amount of time you can either give the
>> >> wrong answer, or take longer to process.  Flink can't violate the laws
>> >> of physics.  If the tradeoffs that Flink make are better for your use
>> >> case than the tradeoffs that DStreams make, you may be better off
>> >> using Flink (or testing out spark 2.0 structured streaming, although
>> >> there's no kafka integration available for that yet)
>> >>
>> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
>> >> > ok, thanks. I tried  to set minimum max rate for beginning. However
>> >> > in
>> >> > general I don't know initial throughput. BTW it would be very useful
>> >> > to
>> >> > explain it in
>> >> >
>> >> >
>> >> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>> >> >
>> >> > And really with
>> >> >
>> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> > .set("spark.streaming.receiver.maxRate", "1")
>> >> >
>> >> > I have same problem:
>> >> >
>> >> >
>> >> > ***
>> >> > Processing time: 36994
>> >> > Expected time: 1
>> >> > Processed messages: 3015830
>> >> > Message example: {"message": 1,
>> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> > Recovered json:
>> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >
>> >> >
>> >> > Regarding auto.offset.reset smallest, now it is because of a test and
>> >> > I
>> >> > want
>> >> > to get same messages for each run. But in any case I expect to read
>> >> > all
>> >> > new
>> >> > messages from queue.
>> >> >
>> >> > Regarding backpressure 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Ok, thanks. But really this is not full decision. In case of growing of
processing time I will have growing of window time. That is really with
Spark I have 2 points of a latency growing. First is a delay of processing
of messages in Kafka queue due to physical limitation of a computer system.
And second one is growing of a processing window time of Spark. In case of
Flink there is only first point of delay but time intervals of output data
are fixed. It is really looks like limitation of Spark. That is if dataflow
is stable, all is ok. If there are peaks of loading more than possibility
of computational system or *data dependent time of calculation*, Spark is
not able to provide a periodically stable results output. Sometimes this is
appropriate but sometime this is not appropriate.

2016-07-06 18:11 GMT+02:00 Cody Koeninger :

> Then double the upper limit you have set until the processing time
> approaches the batch time.
>
> On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> > Ok, with:
> >
> > .set("spark.streaming.backpressure.enabled","true")
> > .set("spark.streaming.receiver.maxRate", "1")
> > .set("spark.streaming.kafka.maxRatePerPartition", "1")
> >
> > I have something like
> >
> >
> ***
> > Processing time: 5626
> > Expected time: 1
> > Processed messages: 10
> > Message example: {"message": 950002,
> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> > Recovered json:
> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> >
> > That is yes, it works but throughput is much less than without
> limitations
> > because of this is an absolute upper limit. And time of processing is
> half
> > of available.
> >
> > Regarding Spark 2.0 structured streaming I will look it some later. Now I
> > don't know how to strictly measure throughput and latency of this high
> level
> > API. My aim now is to compare streaming processors.
> >
> >
> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
> >>
> >> The configuration you set is spark.streaming.receiver.maxRate.  The
> >> direct stream is not a receiver.  As I said in my first message in
> >> this thread, and as the pages at
> >>
> >>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> >> and
> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >> also say, use maxRatePerPartition for the direct stream.
> >>
> >> Bottom line, if you have more information than your system can process
> >> in X amount of time, after X amount of time you can either give the
> >> wrong answer, or take longer to process.  Flink can't violate the laws
> >> of physics.  If the tradeoffs that Flink make are better for your use
> >> case than the tradeoffs that DStreams make, you may be better off
> >> using Flink (or testing out spark 2.0 structured streaming, although
> >> there's no kafka integration available for that yet)
> >>
> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> >> > ok, thanks. I tried  to set minimum max rate for beginning. However in
> >> > general I don't know initial throughput. BTW it would be very useful
> to
> >> > explain it in
> >> >
> >> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
> >> >
> >> > And really with
> >> >
> >> > .set("spark.streaming.backpressure.enabled","true")
> >> > .set("spark.streaming.receiver.maxRate", "1")
> >> >
> >> > I have same problem:
> >> >
> >> >
> ***
> >> > Processing time: 36994
> >> > Expected time: 1
> >> > Processed messages: 3015830
> >> > Message example: {"message": 1,
> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> > Recovered json:
> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >> >
> >> >
> >> > Regarding auto.offset.reset smallest, now it is because of a test and
> I
> >> > want
> >> > to get same messages for each run. But in any case I expect to read
> all
> >> > new
> >> > messages from queue.
> >> >
> >> > Regarding backpressure detection. What is to do then a process time is
> >> > much
> >> > more then input rate? Now I see growing time of processing instead of
> >> > stable
> >> > 10 second and decreasing number of processed messages. Where is a
> limit
> >> > of
> >> > of backpressure algorithm?
> >> >
> >> > Regarding Flink. I don't know how works core of Flink but you can
> check
> >> > self
> >> > that Flink will strictly terminate processing of messages by time.
> >> > Deviation
> >> > of the time window from 10 seconds to several minutes is impossible.
> >> >
> >> > PS: I prepared this example to make possible easy observe the problem
> >> > and
> >> > fix it if it is a bug. For me it is obvious. May I ask you to be near
> to
> >> > this simple source code? In other case I have 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
Then double the upper limit you have set until the processing time
approaches the batch time.

On Wed, Jul 6, 2016 at 11:06 AM, rss rss  wrote:
> Ok, with:
>
> .set("spark.streaming.backpressure.enabled","true")
> .set("spark.streaming.receiver.maxRate", "1")
> .set("spark.streaming.kafka.maxRatePerPartition", "1")
>
> I have something like
>
> ***
> Processing time: 5626
> Expected time: 1
> Processed messages: 10
> Message example: {"message": 950002,
> "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
> Recovered json:
> {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>
> That is yes, it works but throughput is much less than without limitations
> because of this is an absolute upper limit. And time of processing is half
> of available.
>
> Regarding Spark 2.0 structured streaming I will look it some later. Now I
> don't know how to strictly measure throughput and latency of this high level
> API. My aim now is to compare streaming processors.
>
>
> 2016-07-06 17:41 GMT+02:00 Cody Koeninger :
>>
>> The configuration you set is spark.streaming.receiver.maxRate.  The
>> direct stream is not a receiver.  As I said in my first message in
>> this thread, and as the pages at
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>> and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> also say, use maxRatePerPartition for the direct stream.
>>
>> Bottom line, if you have more information than your system can process
>> in X amount of time, after X amount of time you can either give the
>> wrong answer, or take longer to process.  Flink can't violate the laws
>> of physics.  If the tradeoffs that Flink make are better for your use
>> case than the tradeoffs that DStreams make, you may be better off
>> using Flink (or testing out spark 2.0 structured streaming, although
>> there's no kafka integration available for that yet)
>>
>> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
>> > ok, thanks. I tried  to set minimum max rate for beginning. However in
>> > general I don't know initial throughput. BTW it would be very useful to
>> > explain it in
>> >
>> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>> >
>> > And really with
>> >
>> > .set("spark.streaming.backpressure.enabled","true")
>> > .set("spark.streaming.receiver.maxRate", "1")
>> >
>> > I have same problem:
>> >
>> > ***
>> > Processing time: 36994
>> > Expected time: 1
>> > Processed messages: 3015830
>> > Message example: {"message": 1,
>> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> > Recovered json:
>> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >
>> >
>> > Regarding auto.offset.reset smallest, now it is because of a test and I
>> > want
>> > to get same messages for each run. But in any case I expect to read all
>> > new
>> > messages from queue.
>> >
>> > Regarding backpressure detection. What is to do then a process time is
>> > much
>> > more then input rate? Now I see growing time of processing instead of
>> > stable
>> > 10 second and decreasing number of processed messages. Where is a limit
>> > of
>> > of backpressure algorithm?
>> >
>> > Regarding Flink. I don't know how works core of Flink but you can check
>> > self
>> > that Flink will strictly terminate processing of messages by time.
>> > Deviation
>> > of the time window from 10 seconds to several minutes is impossible.
>> >
>> > PS: I prepared this example to make possible easy observe the problem
>> > and
>> > fix it if it is a bug. For me it is obvious. May I ask you to be near to
>> > this simple source code? In other case I have to think that this is a
>> > technical limitation of Spark to work with unstable data flows.
>> >
>> > Cheers
>> >
>> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
>> >>
>> >> The direct stream determines batch sizes on the driver, in advance of
>> >> processing.  If you haven't specified a maximum batch size, how would
>> >> you suggest the backpressure code determine how to limit the first
>> >> batch?  It has no data on throughput until at least one batch is
>> >> completed.
>> >>
>> >> Again, this is why I'm saying test by producing messages into kafka at
>> >> a rate comparable to production, not loading a ton of messages in and
>> >> starting from auto.offset.reset smallest.
>> >>
>> >> Even if you're unwilling to take that advice for some reason, and
>> >> unwilling to empirically determine a reasonable maximum partition
>> >> size, you should be able to estimate an upper bound such that the
>> >> first batch does not encompass your entire kafka retention.
>> >> Backpressure will kick in once it has some information to work with.
>> >>
>> 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Ok, with:

.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.receiver.maxRate", "1")
.set("spark.streaming.kafka.maxRatePerPartition", "1")

I have something like

***
Processing time: 5626
Expected time: 1
Processed messages: 10
Message example: {"message": 950002,
"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
Recovered json:
{"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}

That is yes, it works but throughput is much less than without limitations
because of this is an absolute upper limit. And time of processing is half
of available.

Regarding Spark 2.0 structured streaming I will look it some later. Now I
don't know how to strictly measure throughput and latency of this high
level API. My aim now is to compare streaming processors.


2016-07-06 17:41 GMT+02:00 Cody Koeninger :

> The configuration you set is spark.streaming.receiver.maxRate.  The
> direct stream is not a receiver.  As I said in my first message in
> this thread, and as the pages at
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
> and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> also say, use maxRatePerPartition for the direct stream.
>
> Bottom line, if you have more information than your system can process
> in X amount of time, after X amount of time you can either give the
> wrong answer, or take longer to process.  Flink can't violate the laws
> of physics.  If the tradeoffs that Flink make are better for your use
> case than the tradeoffs that DStreams make, you may be better off
> using Flink (or testing out spark 2.0 structured streaming, although
> there's no kafka integration available for that yet)
>
> On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> > ok, thanks. I tried  to set minimum max rate for beginning. However in
> > general I don't know initial throughput. BTW it would be very useful to
> > explain it in
> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
> >
> > And really with
> >
> > .set("spark.streaming.backpressure.enabled","true")
> > .set("spark.streaming.receiver.maxRate", "1")
> >
> > I have same problem:
> >
> ***
> > Processing time: 36994
> > Expected time: 1
> > Processed messages: 3015830
> > Message example: {"message": 1,
> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> > Recovered json:
> {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >
> >
> > Regarding auto.offset.reset smallest, now it is because of a test and I
> want
> > to get same messages for each run. But in any case I expect to read all
> new
> > messages from queue.
> >
> > Regarding backpressure detection. What is to do then a process time is
> much
> > more then input rate? Now I see growing time of processing instead of
> stable
> > 10 second and decreasing number of processed messages. Where is a limit
> of
> > of backpressure algorithm?
> >
> > Regarding Flink. I don't know how works core of Flink but you can check
> self
> > that Flink will strictly terminate processing of messages by time.
> Deviation
> > of the time window from 10 seconds to several minutes is impossible.
> >
> > PS: I prepared this example to make possible easy observe the problem and
> > fix it if it is a bug. For me it is obvious. May I ask you to be near to
> > this simple source code? In other case I have to think that this is a
> > technical limitation of Spark to work with unstable data flows.
> >
> > Cheers
> >
> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
> >>
> >> The direct stream determines batch sizes on the driver, in advance of
> >> processing.  If you haven't specified a maximum batch size, how would
> >> you suggest the backpressure code determine how to limit the first
> >> batch?  It has no data on throughput until at least one batch is
> >> completed.
> >>
> >> Again, this is why I'm saying test by producing messages into kafka at
> >> a rate comparable to production, not loading a ton of messages in and
> >> starting from auto.offset.reset smallest.
> >>
> >> Even if you're unwilling to take that advice for some reason, and
> >> unwilling to empirically determine a reasonable maximum partition
> >> size, you should be able to estimate an upper bound such that the
> >> first batch does not encompass your entire kafka retention.
> >> Backpressure will kick in once it has some information to work with.
> >>
> >> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> >> > Hello,
> >> >
> >> >   thanks, I tried to
> .set("spark.streaming.backpressure.enabled","true")
> >> > but
> >> > result is negative. Therefore I have prepared small test
> >> > https://github.com/rssdev10/spark-kafka-streaming
> >> >
> >> >   

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
The configuration you set is spark.streaming.receiver.maxRate.  The
direct stream is not a receiver.  As I said in my first message in
this thread, and as the pages at
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
and http://spark.apache.org/docs/latest/configuration.html#spark-streaming
also say, use maxRatePerPartition for the direct stream.

Bottom line, if you have more information than your system can process
in X amount of time, after X amount of time you can either give the
wrong answer, or take longer to process.  Flink can't violate the laws
of physics.  If the tradeoffs that Flink make are better for your use
case than the tradeoffs that DStreams make, you may be better off
using Flink (or testing out spark 2.0 structured streaming, although
there's no kafka integration available for that yet)

On Wed, Jul 6, 2016 at 10:25 AM, rss rss  wrote:
> ok, thanks. I tried  to set minimum max rate for beginning. However in
> general I don't know initial throughput. BTW it would be very useful to
> explain it in
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>
> And really with
>
> .set("spark.streaming.backpressure.enabled","true")
> .set("spark.streaming.receiver.maxRate", "1")
>
> I have same problem:
> ***
> Processing time: 36994
> Expected time: 1
> Processed messages: 3015830
> Message example: {"message": 1,
> "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>
>
> Regarding auto.offset.reset smallest, now it is because of a test and I want
> to get same messages for each run. But in any case I expect to read all new
> messages from queue.
>
> Regarding backpressure detection. What is to do then a process time is much
> more then input rate? Now I see growing time of processing instead of stable
> 10 second and decreasing number of processed messages. Where is a limit of
> of backpressure algorithm?
>
> Regarding Flink. I don't know how works core of Flink but you can check self
> that Flink will strictly terminate processing of messages by time. Deviation
> of the time window from 10 seconds to several minutes is impossible.
>
> PS: I prepared this example to make possible easy observe the problem and
> fix it if it is a bug. For me it is obvious. May I ask you to be near to
> this simple source code? In other case I have to think that this is a
> technical limitation of Spark to work with unstable data flows.
>
> Cheers
>
> 2016-07-06 16:40 GMT+02:00 Cody Koeninger :
>>
>> The direct stream determines batch sizes on the driver, in advance of
>> processing.  If you haven't specified a maximum batch size, how would
>> you suggest the backpressure code determine how to limit the first
>> batch?  It has no data on throughput until at least one batch is
>> completed.
>>
>> Again, this is why I'm saying test by producing messages into kafka at
>> a rate comparable to production, not loading a ton of messages in and
>> starting from auto.offset.reset smallest.
>>
>> Even if you're unwilling to take that advice for some reason, and
>> unwilling to empirically determine a reasonable maximum partition
>> size, you should be able to estimate an upper bound such that the
>> first batch does not encompass your entire kafka retention.
>> Backpressure will kick in once it has some information to work with.
>>
>> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
>> > Hello,
>> >
>> >   thanks, I tried to .set("spark.streaming.backpressure.enabled","true")
>> > but
>> > result is negative. Therefore I have prepared small test
>> > https://github.com/rssdev10/spark-kafka-streaming
>> >
>> >   How to run:
>> >   git clone https://github.com/rssdev10/spark-kafka-streaming.git
>> >   cd spark-kafka-streaming
>> >
>> >   # downloads kafka and zookeeper
>> >   ./gradlew setup
>> >
>> >   # run zookeeper, kafka, and run messages generation
>> >   ./gradlew test_data_prepare
>> >
>> > And in other console just run:
>> >./gradlew test_spark
>> >
>> > It is easy to break data generation by CTRL-C. And continue by same
>> > command
>> > ./gradlew test_data_prepare
>> >
>> > To stop all run:
>> >   ./gradlew stop_all
>> >
>> > Spark test must generate messages each 10 seconds like:
>> >
>> > ***
>> > Processing time: 33477
>> > Expected time: 1
>> > Processed messages: 2897866
>> > Message example: {"message": 1,
>> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> > Recovered json:
>> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >
>> >
>> > message is number of fist message in the window. Time values are in
>> > milliseconds.
>> >
>> > Brief results:
>> >
>> > Spark always reads all messaged from Kafka after 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
ok, thanks. I tried  to set minimum max rate for beginning. However in
general I don't know initial throughput. BTW it would be very useful to
explain it in
https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

And really with

.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.receiver.maxRate", "1")

I have same problem:
***
Processing time: *36994*
Expected time: 1
Processed messages: *3015830*
Message example: {"message": 1,
"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}


Regarding auto.offset.reset smallest, now it is because of a test and I
want to get same messages for each run. But in any case I expect to read
all new messages from queue.

Regarding backpressure detection. What is to do then a process time is much
more then input rate? Now I see growing time of processing instead of
stable 10 second and decreasing number of processed messages. Where is a
limit of of backpressure algorithm?

Regarding Flink. I don't know how works core of Flink but you can check
self that Flink will strictly terminate processing of messages by time.
Deviation of the time window from 10 seconds to several minutes is
impossible.

PS: I prepared this example to make possible easy observe the problem and
fix it if it is a bug. For me it is obvious. May I ask you to be near to
this simple source code? In other case I have to think that this is a
technical limitation of Spark to work with unstable data flows.

Cheers

2016-07-06 16:40 GMT+02:00 Cody Koeninger :

> The direct stream determines batch sizes on the driver, in advance of
> processing.  If you haven't specified a maximum batch size, how would
> you suggest the backpressure code determine how to limit the first
> batch?  It has no data on throughput until at least one batch is
> completed.
>
> Again, this is why I'm saying test by producing messages into kafka at
> a rate comparable to production, not loading a ton of messages in and
> starting from auto.offset.reset smallest.
>
> Even if you're unwilling to take that advice for some reason, and
> unwilling to empirically determine a reasonable maximum partition
> size, you should be able to estimate an upper bound such that the
> first batch does not encompass your entire kafka retention.
> Backpressure will kick in once it has some information to work with.
>
> On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> > Hello,
> >
> >   thanks, I tried to .set("spark.streaming.backpressure.enabled","true")
> but
> > result is negative. Therefore I have prepared small test
> > https://github.com/rssdev10/spark-kafka-streaming
> >
> >   How to run:
> >   git clone https://github.com/rssdev10/spark-kafka-streaming.git
> >   cd spark-kafka-streaming
> >
> >   # downloads kafka and zookeeper
> >   ./gradlew setup
> >
> >   # run zookeeper, kafka, and run messages generation
> >   ./gradlew test_data_prepare
> >
> > And in other console just run:
> >./gradlew test_spark
> >
> > It is easy to break data generation by CTRL-C. And continue by same
> command
> > ./gradlew test_data_prepare
> >
> > To stop all run:
> >   ./gradlew stop_all
> >
> > Spark test must generate messages each 10 seconds like:
> >
> ***
> > Processing time: 33477
> > Expected time: 1
> > Processed messages: 2897866
> > Message example: {"message": 1,
> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> > Recovered json:
> {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> >
> >
> > message is number of fist message in the window. Time values are in
> > milliseconds.
> >
> > Brief results:
> >
> > Spark always reads all messaged from Kafka after first connection
> > independently on dstream or window size time. It looks like a bug.
> > When processing speed in Spark's app is near to speed of data generation
> all
> > is ok.
> > I added delayFactor in
> >
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> > to emulate slow processing. And streaming process is in degradation. When
> > delayFactor=0 it looks like stable process.
> >
> >
> > Cheers
> >
> >
> > 2016-07-05 17:51 GMT+02:00 Cody Koeninger :
> >>
> >> Test by producing messages into kafka at a rate comparable to what you
> >> expect in production.
> >>
> >> Test with backpressure turned on, it doesn't require you to specify a
> >> fixed limit on number of messages and will do its best to maintain
> >> batch timing.  Or you could empirically determine a reasonable fixed
> >> limit.
> >>
> >> Setting up a kafka topic with way more static messages in it than your
> >> system can handle in one batch, and then starting a stream from the
> >> beginning of it without turning on backpressure 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread Cody Koeninger
The direct stream determines batch sizes on the driver, in advance of
processing.  If you haven't specified a maximum batch size, how would
you suggest the backpressure code determine how to limit the first
batch?  It has no data on throughput until at least one batch is
completed.

Again, this is why I'm saying test by producing messages into kafka at
a rate comparable to production, not loading a ton of messages in and
starting from auto.offset.reset smallest.

Even if you're unwilling to take that advice for some reason, and
unwilling to empirically determine a reasonable maximum partition
size, you should be able to estimate an upper bound such that the
first batch does not encompass your entire kafka retention.
Backpressure will kick in once it has some information to work with.

On Wed, Jul 6, 2016 at 8:45 AM, rss rss  wrote:
> Hello,
>
>   thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
> result is negative. Therefore I have prepared small test
> https://github.com/rssdev10/spark-kafka-streaming
>
>   How to run:
>   git clone https://github.com/rssdev10/spark-kafka-streaming.git
>   cd spark-kafka-streaming
>
>   # downloads kafka and zookeeper
>   ./gradlew setup
>
>   # run zookeeper, kafka, and run messages generation
>   ./gradlew test_data_prepare
>
> And in other console just run:
>./gradlew test_spark
>
> It is easy to break data generation by CTRL-C. And continue by same command
> ./gradlew test_data_prepare
>
> To stop all run:
>   ./gradlew stop_all
>
> Spark test must generate messages each 10 seconds like:
> ***
> Processing time: 33477
> Expected time: 1
> Processed messages: 2897866
> Message example: {"message": 1,
> "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
> Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>
>
> message is number of fist message in the window. Time values are in
> milliseconds.
>
> Brief results:
>
> Spark always reads all messaged from Kafka after first connection
> independently on dstream or window size time. It looks like a bug.
> When processing speed in Spark's app is near to speed of data generation all
> is ok.
> I added delayFactor in
> https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
> to emulate slow processing. And streaming process is in degradation. When
> delayFactor=0 it looks like stable process.
>
>
> Cheers
>
>
> 2016-07-05 17:51 GMT+02:00 Cody Koeninger :
>>
>> Test by producing messages into kafka at a rate comparable to what you
>> expect in production.
>>
>> Test with backpressure turned on, it doesn't require you to specify a
>> fixed limit on number of messages and will do its best to maintain
>> batch timing.  Or you could empirically determine a reasonable fixed
>> limit.
>>
>> Setting up a kafka topic with way more static messages in it than your
>> system can handle in one batch, and then starting a stream from the
>> beginning of it without turning on backpressure or limiting the number
>> of messages... isn't a reasonable way to test steady state
>> performance.  Flink can't magically give you a correct answer under
>> those circumstances either.
>>
>> On Tue, Jul 5, 2016 at 10:41 AM, rss rss  wrote:
>> > Hi, thanks.
>> >
>> >I know about possibility to limit number of messages. But the problem
>> > is
>> > I don't know number of messages which the system able to process. It
>> > depends
>> > on data. The example is very simple. I need a strict response after
>> > specified time. Something like soft real time. In case of Flink I able
>> > to
>> > setup strict time of processing like this:
>> >
>> > KeyedStream keyed =
>> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
>> > WindowedStream uniqUsersWin =
>> > keyed.timeWindow(
>> > Time.seconds(10) );
>> > DataStream uniqUsers =
>> > uniq.trigger(ProcessingTimeTrigger.create())
>> > .fold(new Aggregator(), new FoldFunction() {
>> > @Override
>> > public Aggregator fold(Aggregator accumulator, Event value)
>> > throws Exception {
>> > accumulator.add(event.userId);
>> > return accumulator;
>> > }
>> > });
>> >
>> > uniq.print();
>> >
>> > And I can see results every 10 seconds independently on input data
>> > stream.
>> > Is it possible something in Spark?
>> >
>> > Regarding zeros in my example the reason I have prepared message queue
>> > in
>> > Kafka for the tests. If I add some messages after I able to see new
>> > messages. But in any case I need first response after 10 second. Not
>> > minutes
>> > or hours after.
>> >
>> > Thanks.
>> >
>> >
>> >
>> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger :
>> >>
>> >> If you're talking about limiting the number of 

Re: Spark streaming. Strict discretizing by time

2016-07-06 Thread rss rss
Hello,

  thanks, I tried to .set("spark.streaming.backpressure.enabled","true") but
result is negative. Therefore I have prepared small test
https://github.com/rssdev10/spark-kafka-streaming

  How to run:

*  git clone https://github.com/rssdev10/spark-kafka-streaming.git
  cd
spark-kafka-streaming*

  # downloads kafka and zookeeper
*  ./gradlew setup *

  # run zookeeper, kafka, and run messages generation
*  ./gradlew test_data_prepare *

And in other console just run:
*   ./gradlew test_spark*

It is easy to break data generation by CTRL-C. And continue by same
command *./gradlew
test_data_prepare*

To stop all run:
  *./gradlew stop_all*

Spark test must generate messages each 10 seconds like:
***
Processing time: 33477
Expected time: 1
Processed messages: 2897866
Message example: {"message": 1,
"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
Recovered json: {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}


*message* is number of fist message in the window. Time values are in
milliseconds.

Brief results:

   1. Spark always reads all messaged from Kafka after first connection
   independently on dstream or window size time. It looks like a bug.
   2. When processing speed in Spark's app is near to speed of data
   generation all is ok.
   3. I added delayFactor in
   
https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
   to emulate slow processing. And streaming process is in degradation. When
   delayFactor=0 it looks like stable process.


Cheers


2016-07-05 17:51 GMT+02:00 Cody Koeninger :

> Test by producing messages into kafka at a rate comparable to what you
> expect in production.
>
> Test with backpressure turned on, it doesn't require you to specify a
> fixed limit on number of messages and will do its best to maintain
> batch timing.  Or you could empirically determine a reasonable fixed
> limit.
>
> Setting up a kafka topic with way more static messages in it than your
> system can handle in one batch, and then starting a stream from the
> beginning of it without turning on backpressure or limiting the number
> of messages... isn't a reasonable way to test steady state
> performance.  Flink can't magically give you a correct answer under
> those circumstances either.
>
> On Tue, Jul 5, 2016 at 10:41 AM, rss rss  wrote:
> > Hi, thanks.
> >
> >I know about possibility to limit number of messages. But the problem
> is
> > I don't know number of messages which the system able to process. It
> depends
> > on data. The example is very simple. I need a strict response after
> > specified time. Something like soft real time. In case of Flink I able to
> > setup strict time of processing like this:
> >
> > KeyedStream keyed =
> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
> > WindowedStream uniqUsersWin =
> keyed.timeWindow(
> > Time.seconds(10) );
> > DataStream uniqUsers =
> > uniq.trigger(ProcessingTimeTrigger.create())
> > .fold(new Aggregator(), new FoldFunction() {
> > @Override
> > public Aggregator fold(Aggregator accumulator, Event value)
> > throws Exception {
> > accumulator.add(event.userId);
> > return accumulator;
> > }
> > });
> >
> > uniq.print();
> >
> > And I can see results every 10 seconds independently on input data
> stream.
> > Is it possible something in Spark?
> >
> > Regarding zeros in my example the reason I have prepared message queue in
> > Kafka for the tests. If I add some messages after I able to see new
> > messages. But in any case I need first response after 10 second. Not
> minutes
> > or hours after.
> >
> > Thanks.
> >
> >
> >
> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger :
> >>
> >> If you're talking about limiting the number of messages per batch to
> >> try and keep from exceeding batch time, see
> >>
> >> http://spark.apache.org/docs/latest/configuration.html
> >>
> >> look for backpressure and maxRatePerParition
> >>
> >>
> >> But if you're only seeing zeros after your job runs for a minute, it
> >> sounds like something else is wrong.
> >>
> >>
> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss  wrote:
> >> > Hello,
> >> >
> >> >   I'm trying to organize processing of messages from Kafka. And there
> is
> >> > a
> >> > typical case when a number of messages in kafka's queue is more then
> >> > Spark
> >> > app's possibilities to process. But I need a strong time limit to
> >> > prepare
> >> > result for at least for a part of data.
> >> >
> >> > Code example:
> >> >
> >> > SparkConf sparkConf = new SparkConf()
> >> > .setAppName("Spark")
> >> > .setMaster("local");
> >> >
> >> > 

Re: Spark streaming. Strict discretizing by time

2016-07-05 Thread Cody Koeninger
Test by producing messages into kafka at a rate comparable to what you
expect in production.

Test with backpressure turned on, it doesn't require you to specify a
fixed limit on number of messages and will do its best to maintain
batch timing.  Or you could empirically determine a reasonable fixed
limit.

Setting up a kafka topic with way more static messages in it than your
system can handle in one batch, and then starting a stream from the
beginning of it without turning on backpressure or limiting the number
of messages... isn't a reasonable way to test steady state
performance.  Flink can't magically give you a correct answer under
those circumstances either.

On Tue, Jul 5, 2016 at 10:41 AM, rss rss  wrote:
> Hi, thanks.
>
>I know about possibility to limit number of messages. But the problem is
> I don't know number of messages which the system able to process. It depends
> on data. The example is very simple. I need a strict response after
> specified time. Something like soft real time. In case of Flink I able to
> setup strict time of processing like this:
>
> KeyedStream keyed =
> eventStream.keyBy(event.userId.getBytes()[0] % partNum);
> WindowedStream uniqUsersWin = keyed.timeWindow(
> Time.seconds(10) );
> DataStream uniqUsers =
> uniq.trigger(ProcessingTimeTrigger.create())
> .fold(new Aggregator(), new FoldFunction() {
> @Override
> public Aggregator fold(Aggregator accumulator, Event value)
> throws Exception {
> accumulator.add(event.userId);
> return accumulator;
> }
> });
>
> uniq.print();
>
> And I can see results every 10 seconds independently on input data stream.
> Is it possible something in Spark?
>
> Regarding zeros in my example the reason I have prepared message queue in
> Kafka for the tests. If I add some messages after I able to see new
> messages. But in any case I need first response after 10 second. Not minutes
> or hours after.
>
> Thanks.
>
>
>
> 2016-07-05 17:12 GMT+02:00 Cody Koeninger :
>>
>> If you're talking about limiting the number of messages per batch to
>> try and keep from exceeding batch time, see
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> look for backpressure and maxRatePerParition
>>
>>
>> But if you're only seeing zeros after your job runs for a minute, it
>> sounds like something else is wrong.
>>
>>
>> On Tue, Jul 5, 2016 at 10:02 AM, rss rss  wrote:
>> > Hello,
>> >
>> >   I'm trying to organize processing of messages from Kafka. And there is
>> > a
>> > typical case when a number of messages in kafka's queue is more then
>> > Spark
>> > app's possibilities to process. But I need a strong time limit to
>> > prepare
>> > result for at least for a part of data.
>> >
>> > Code example:
>> >
>> > SparkConf sparkConf = new SparkConf()
>> > .setAppName("Spark")
>> > .setMaster("local");
>> >
>> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> > Milliseconds.apply(5000));
>> >
>> > jssc.checkpoint("/tmp/spark_checkpoint");
>> >
>> > Set topicMap = new
>> > HashSet<>(Arrays.asList(topicList.split(",")));
>> > Map kafkaParams = new HashMap()
>> > {
>> > {
>> > put("metadata.broker.list", bootstrapServers);
>> > put("auto.offset.reset", "smallest");
>> > }
>> > };
>> >
>> > JavaPairInputDStream messages =
>> > KafkaUtils.createDirectStream(jssc,
>> > String.class,
>> > String.class,
>> > StringDecoder.class,
>> > StringDecoder.class,
>> > kafkaParams,
>> > topicMap);
>> >
>> > messages.countByWindow(Seconds.apply(10),
>> > Milliseconds.apply(5000))
>> > .map(x -> {System.out.println(x); return x;})
>> > .dstream().saveAsTextFiles("/tmp/spark",
>> > "spark-streaming");
>> >
>> >
>> >   I need to see a result of window operation each 10 seconds (this is
>> > only
>> > simplest example). But really with my test data ~10M messages I have
>> > first
>> > result a minute after and further I see only zeros. Is a way to limit
>> > processing time to guarantee a response in specified time like Apache
>> > Flink's triggers?
>> >
>> > Thanks.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming. Strict discretizing by time

2016-07-05 Thread rss rss
Hi, thanks.

   I know about possibility to limit number of messages. But the problem is
I don't know number of messages which the system able to process. It
depends on data. The example is very simple. I need a strict response after
specified time. Something like soft real time. In case of Flink I able to
setup strict time of processing like this:

KeyedStream keyed =
eventStream.keyBy(event.userId.getBytes()[0] % partNum);
WindowedStream uniqUsersWin =
keyed.timeWindow( *Time.seconds(10*) );
DataStream uniqUsers =
uniq.trigger(*ProcessingTimeTrigger*.create())
.fold(new Aggregator(), new FoldFunction() {
@Override
public Aggregator fold(Aggregator accumulator, Event
value) throws Exception {
accumulator.add(event.userId);
return accumulator;
}
});

uniq.print();

And I can see results every 10 seconds independently on input data stream.
Is it possible something in Spark?

Regarding zeros in my example the reason I have prepared message queue in
Kafka for the tests. If I add some messages after I able to see new
messages. But in any case I need first response after 10 second. Not
minutes or hours after.

Thanks.



2016-07-05 17:12 GMT+02:00 Cody Koeninger :

> If you're talking about limiting the number of messages per batch to
> try and keep from exceeding batch time, see
>
> http://spark.apache.org/docs/latest/configuration.html
>
> look for backpressure and maxRatePerParition
>
>
> But if you're only seeing zeros after your job runs for a minute, it
> sounds like something else is wrong.
>
>
> On Tue, Jul 5, 2016 at 10:02 AM, rss rss  wrote:
> > Hello,
> >
> >   I'm trying to organize processing of messages from Kafka. And there is
> a
> > typical case when a number of messages in kafka's queue is more then
> Spark
> > app's possibilities to process. But I need a strong time limit to prepare
> > result for at least for a part of data.
> >
> > Code example:
> >
> > SparkConf sparkConf = new SparkConf()
> > .setAppName("Spark")
> > .setMaster("local");
> >
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> > Milliseconds.apply(5000));
> >
> > jssc.checkpoint("/tmp/spark_checkpoint");
> >
> > Set topicMap = new
> > HashSet<>(Arrays.asList(topicList.split(",")));
> > Map kafkaParams = new HashMap() {
> > {
> > put("metadata.broker.list", bootstrapServers);
> > put("auto.offset.reset", "smallest");
> > }
> > };
> >
> > JavaPairInputDStream messages =
> > KafkaUtils.createDirectStream(jssc,
> > String.class,
> > String.class,
> > StringDecoder.class,
> > StringDecoder.class,
> > kafkaParams,
> > topicMap);
> >
> > messages.countByWindow(Seconds.apply(10),
> Milliseconds.apply(5000))
> > .map(x -> {System.out.println(x); return x;})
> > .dstream().saveAsTextFiles("/tmp/spark",
> "spark-streaming");
> >
> >
> >   I need to see a result of window operation each 10 seconds (this is
> only
> > simplest example). But really with my test data ~10M messages I have
> first
> > result a minute after and further I see only zeros. Is a way to limit
> > processing time to guarantee a response in specified time like Apache
> > Flink's triggers?
> >
> > Thanks.
>


Re: Spark streaming. Strict discretizing by time

2016-07-05 Thread Cody Koeninger
If you're talking about limiting the number of messages per batch to
try and keep from exceeding batch time, see

http://spark.apache.org/docs/latest/configuration.html

look for backpressure and maxRatePerParition


But if you're only seeing zeros after your job runs for a minute, it
sounds like something else is wrong.


On Tue, Jul 5, 2016 at 10:02 AM, rss rss  wrote:
> Hello,
>
>   I'm trying to organize processing of messages from Kafka. And there is a
> typical case when a number of messages in kafka's queue is more then Spark
> app's possibilities to process. But I need a strong time limit to prepare
> result for at least for a part of data.
>
> Code example:
>
> SparkConf sparkConf = new SparkConf()
> .setAppName("Spark")
> .setMaster("local");
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Milliseconds.apply(5000));
>
> jssc.checkpoint("/tmp/spark_checkpoint");
>
> Set topicMap = new
> HashSet<>(Arrays.asList(topicList.split(",")));
> Map kafkaParams = new HashMap() {
> {
> put("metadata.broker.list", bootstrapServers);
> put("auto.offset.reset", "smallest");
> }
> };
>
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicMap);
>
> messages.countByWindow(Seconds.apply(10), Milliseconds.apply(5000))
> .map(x -> {System.out.println(x); return x;})
> .dstream().saveAsTextFiles("/tmp/spark", "spark-streaming");
>
>
>   I need to see a result of window operation each 10 seconds (this is only
> simplest example). But really with my test data ~10M messages I have first
> result a minute after and further I see only zeros. Is a way to limit
> processing time to guarantee a response in specified time like Apache
> Flink's triggers?
>
> Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org