Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

2017-11-24 Thread Matthias J. Sax
Yes and no :)

1) if you use the index to find the "end-offset" of your scan ("consume
all messages with a smaller offset") you would compare the offset of
each message with this "end offset" -- thus, this is the same thing as
consuming the topic from beginning and just compare the record timestamp
directly (if it's larger you can stop the scan). What I am trying to say
is, that the index doesn't help you to build a more efficient solution :)

The index in only useful, if you want to use the offset to *start
reading* from.


2) If you stop reading at the "end-offset", you do not encounter for
late arriving records. Both the time-index as well as the manual linear
scan would stop at the first record with a timestamp larger than your
"end timestamp". However, if you have late arriving data, you would miss
those. And as you claim you want to sort records in timestamp order, you
obviously have late data (otherwise, there would not be any need to
reorder records). Thus, you need to keep scanning for "some more data"
to also find late records -- how much more you want to scan is something
you need to define by yourself. For example, you could say, I read until
I see a message with timestamp "end timestamp + 5minutes", because I
know data is max 5 minutes late.


3) Your original problem was to sort data by timestamp -- thus, if you
scan your data, you also need to buffer data in main memory, reorder
out-of-order records, and write back to the output topic. Thus, you
still need a lot of custom code (but I agree, it might be less than if
you use Kafka Streams).



-Matthias

On 11/24/17 2:26 PM, Ray Ruvinskiy wrote:
> I see, thanks. I suppose I was trying to avoid so much custom logic, which is 
> why I initially was looking at the time-based index functionality. Based on 
> what you’ve said so far, I take it using the time-based index to find an 
> offset corresponding to a timestamp and then consuming all messages with a 
> smaller offset is not a viable solution?
> 
> Ray
> 
> On 2017-11-22, 12:12 AM, "Matthias J. Sax"  wrote:
> 
> Using Kafka Streams, it seems reasonable to implement this using
> low-level Processor API with a custom state store.
> 
> Thus, you use the `StateStore` interface to implement you state store --
> this allows you to spill to disk if you need to to handle state larger
> than main memory.
> 
> If you want to browse some state store examples, you can check out
> RocksDBStore class that implement Kafka Streams' default `StateStore`.
> 
> Within your custom `Processor` you can access the state accordingly to
> maintain the window etc.
> 
> It's a quite special use case and thus, there is not much out-of-the-box
> support. You can check out some basic examples here:
> https://github.com/confluentinc/kafka-streams-examples
> 
> One example implements a custom state store (but only in-memory):
> 
> https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
> 
> Hope this helps.
> 
> 
> -Matthias
> 
> On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
> > Thanks for your reply! I am quite inexperienced when it comes to Kafka 
> and Kafka Streams and so would appreciate a little more guidance. How would 
> one keep messages within a sliding window sorted by timestamp? Would the sort 
> operation be done all in memory? I would be dealing potentially with hundreds 
> of thousands of messages per partition within every 5 minute interval and so 
> was looking for solutions that were not necessary limited by the amount of 
> RAM.
> > 
> > Ray
> > 
> > On 2017-11-21, 5:57 PM, "Matthias J. Sax"  wrote:
> > 
> > This is possible, but I think you don't need the time-based index 
> for it :)
> > 
> > You will just buffer up all messages for a 5 minute sliding-window 
> and
> > maintain all message sorted by timestamp in this window. Each time 
> the
> > window "moves" you write the oldest records that "drop out" of the
> > window to the topic. If you get a record with an older timestamp 
> that
> > allowed, you don't insert in into the window but drop it.
> > 
> > The timestamp index is useful if you want to seek to a specific 
> offset
> > base on timestamp. But I don't think you need this for your use 
> case.
> > 
> > 
> > 
> > -Matthias
> > 
> > On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
> > > I’ve been reading 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>  and trying to determine whether I can use the time-based index as an 
> efficient way to sort a stream of messages into timestamp (CreateTime) order.
> > > 
> > > I am dealing with a 

Re: Recommended settings for Internal Topics

2017-11-24 Thread Debraj Manna
Anyone any thoughts?

If I am not changing this value after upgrade to 1.0 in a single node kafka
broker. It will take the default value of 3 . So what will be the behavior
in this case?

On Fri, Nov 24, 2017 at 3:57 PM, Debraj Manna 
wrote:

> Hi
>
> I am migrating from Kafka 0.10 to the latest 1.0 . I did not set any value
> for these fields in Kafka 0.10.  Can some one let me know what is the
> recommended settings for a 3 node broker cluster & for a single node broker
> cluster for the below internal topic settings -
>
>
>- offsets.topic.replication.factor
>- transaction.state.log.replication.factor
>- transaction.state.log.min.isr
>
>
> Thanks,
>


Re: Time-Based Index for Consuming Messages Up to Certain Timestamp

2017-11-24 Thread Ray Ruvinskiy
I see, thanks. I suppose I was trying to avoid so much custom logic, which is 
why I initially was looking at the time-based index functionality. Based on 
what you’ve said so far, I take it using the time-based index to find an offset 
corresponding to a timestamp and then consuming all messages with a smaller 
offset is not a viable solution?

Ray

On 2017-11-22, 12:12 AM, "Matthias J. Sax"  wrote:

Using Kafka Streams, it seems reasonable to implement this using
low-level Processor API with a custom state store.

Thus, you use the `StateStore` interface to implement you state store --
this allows you to spill to disk if you need to to handle state larger
than main memory.

If you want to browse some state store examples, you can check out
RocksDBStore class that implement Kafka Streams' default `StateStore`.

Within your custom `Processor` you can access the state accordingly to
maintain the window etc.

It's a quite special use case and thus, there is not much out-of-the-box
support. You can check out some basic examples here:
https://github.com/confluentinc/kafka-streams-examples

One example implements a custom state store (but only in-memory):

https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala

Hope this helps.


-Matthias

On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
> Thanks for your reply! I am quite inexperienced when it comes to Kafka 
and Kafka Streams and so would appreciate a little more guidance. How would one 
keep messages within a sliding window sorted by timestamp? Would the sort 
operation be done all in memory? I would be dealing potentially with hundreds 
of thousands of messages per partition within every 5 minute interval and so 
was looking for solutions that were not necessary limited by the amount of RAM.
> 
> Ray
> 
> On 2017-11-21, 5:57 PM, "Matthias J. Sax"  wrote:
> 
> This is possible, but I think you don't need the time-based index for 
it :)
> 
> You will just buffer up all messages for a 5 minute sliding-window and
> maintain all message sorted by timestamp in this window. Each time the
> window "moves" you write the oldest records that "drop out" of the
> window to the topic. If you get a record with an older timestamp that
> allowed, you don't insert in into the window but drop it.
> 
> The timestamp index is useful if you want to seek to a specific offset
> base on timestamp. But I don't think you need this for your use case.
> 
> 
> 
> -Matthias
> 
> On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
> > I’ve been reading 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
 and trying to determine whether I can use the time-based index as an efficient 
way to sort a stream of messages into timestamp (CreateTime) order.
> > 
> > I am dealing with a number of sources emitting messages that are 
then processed in a distributed fashion and written to a Kafka topic. During 
this processing, the original order of the messages is not strictly maintained. 
Each message has an embedded timestamp. I’d like to be able to sort these 
messages back into timestamp order, allowing for a certain lateness interval, 
before processing them further. For example, supposing the lateness interval is 
5 minutes, at time T I’d like to consume from the topic all messages with 
timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a 
message should be no more than 5 minutes late; if it is more than 5 minutes 
late, it can be discarded. Is this something that can be done with the 
time-based index?
> > 
> > Thanks,
> > 
> > Ray
> > 
> 
> 
> 





Re: parallel processing of records in a Kafka consumer

2017-11-24 Thread Matthias J. Sax
Your understanding is correct.

The simplest way to get more parallelism is to increase the number of
partitions. There is some overhead for this, but it not too much.

>> what you're writing is in sharp contrast with what I know...

I guess, this target other messaging system: Kafka has a different
design that other messaging system, because Kafka also targets other use
cases than messaging. It's a stream processing platform.

This design has many advantages (and depending on the use case maybe
also disadvantages -- it's always a trade off). For example, it allows
for very high throughput that messaging systems cannot achieve.

The other case for using multiple threads per consumer is hard to get
right -- not impossible, but you need to put a lot of custom code into
place to make this work correctly. The tricky part is committing of
offsets. In Kafka, you don't commit each message individually, but a
commit of offset X, implies that all messages up to X (excluding X) for
this partition got processed successfully (X is the next offset you want
to consume). Thus, if you "branch out" to different thread after the
consumer polled message, you cannot "randomly commit" but need to put
book keeping code to make sure to no commit a message that was not
processed yet.

This is also a design decision and allow for strict in-order message
delivery per partition. Again, a trade-off with advantages and
disadvantages.


Hope this helps :)


-Matthias


On 11/24/17 1:36 AM, Vincenzo D'Amore wrote:
> Hi Matthias,
> 
> what you're writing is in sharp contrast with what I know...
> 
> I read that: "Kafka consumers are typically part of a consumer group. When
> multiple consumers are subscribed to a topic and belong to the same
> consumer group, each consumer in the group will receive messages from a
> different subset of the partitions in the topic."
> 
> https://www.safaribooksonline.com/library/view/kafka-the-
> definitive/9781491936153/ch04.html
> 
> This means that if there are four partition, first consumer within a
> consumer group will read simultaneously from all the existing partitions.
> 
> If we add another consumer, the existing four partition will be divided
> between the two consumers.
> 
> If we have more consumers than partitions, exceeding consumers will remain
> idle.
> 
> https://gist.github.com/freedev/adc3e58789cc23d25d15a7d273535523
> 
> So, if I understood correctly, you cannot have multiple consumers *within
> the same consumer group* concurrently consuming the same partition.
> 
> To be clear, I'm genuinely interested in understanding if I correctly get
> how Kafka consumers works.
> Comments and suggestions are welcome :)
> 
> Best regards,
> Vincenzo
> 
> On Wed, Nov 22, 2017 at 11:15 PM, Matthias J. Sax 
> wrote:
> 
>> I KafkaConsumer itself should be use single threaded. If you want to
>> parallelize processing, each thread should have it's own KafkaConsumer
>> instance and all consumers should use the same `group.id` in their
>> configuration. Load will be shared over all running consumer
>> automatically for this case.
>>
>>
>> -Matthias
>>
>> On 11/22/17 12:22 PM, cours.syst...@gmail.com wrote:
>>> I am testing a KafkaConsumer. How can I modify it to process records in
>> parallel?
>>>
>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Regarding Kafka Consumer

2017-11-24 Thread simarpreet kaur
Thanks, Faraz.

I am using its Java API. It does not seem to provide such method to the
consumer.

On Wed, Nov 22, 2017 at 2:45 PM, Faraz Mateen  wrote:

> Not sure which client you are using.
> In kafka-python, consumer.config returns a dictionary with all consumer
> properties.
>
> Thanks,
> Faraz
>
> On Mon, Nov 20, 2017 at 5:34 PM, simarpreet kaur 
> wrote:
>
>> Hello team,
>>
>> I wanted to know if there is some way I can retrieve consumer properties
>> from the Kafka Consumer. for example, if at runtime, I want to know the
>> group id of a particular consumer, in case multiple consumers are running
>> in my application.
>>
>> Thanks & Regards,
>> Simarpreet
>>
>
>


Re: KafkaConsumer current offsets

2017-11-24 Thread simarpreet kaur
Hii Sameer,

You can find the current offset of a consumer using method *position *provided
to Kafka Consumer. This method takes the topic partition in parameter to
know which partition you want offset of.
For example, you can do something like
*consumer.position()*

To find the topic partitions assigned to your consumer, you can do*
consumer.assignment().*
For more information on consumer offsets, you can refer to the blog.



On Thu, Nov 23, 2017 at 12:39 PM, Sameer Kumar 
wrote:

> Guys, any thoughts on below request.(getting current offsets of a consumer
> group) through a java api.
>
>
>
> On Wed, Nov 22, 2017 at 4:49 PM, Sameer Kumar 
> wrote:
>
> > Hi All,
> >
> > I wanted to know if there is any way to get the current offsets of a
> > consumer group through a java api.
> >
> > -Sameer.
> >
>


Recommended settings for Internal Topics

2017-11-24 Thread Debraj Manna
Hi

I am migrating from Kafka 0.10 to the latest 1.0 . I did not set any value
for these fields in Kafka 0.10.  Can some one let me know what is the
recommended settings for a 3 node broker cluster & for a single node broker
cluster for the below internal topic settings -


   - offsets.topic.replication.factor
   - transaction.state.log.replication.factor
   - transaction.state.log.min.isr


Thanks,


Re: parallel processing of records in a Kafka consumer

2017-11-24 Thread Vincenzo D'Amore
Hi Matthias,

what you're writing is in sharp contrast with what I know...

I read that: "Kafka consumers are typically part of a consumer group. When
multiple consumers are subscribed to a topic and belong to the same
consumer group, each consumer in the group will receive messages from a
different subset of the partitions in the topic."

https://www.safaribooksonline.com/library/view/kafka-the-
definitive/9781491936153/ch04.html

This means that if there are four partition, first consumer within a
consumer group will read simultaneously from all the existing partitions.

If we add another consumer, the existing four partition will be divided
between the two consumers.

If we have more consumers than partitions, exceeding consumers will remain
idle.

https://gist.github.com/freedev/adc3e58789cc23d25d15a7d273535523

So, if I understood correctly, you cannot have multiple consumers *within
the same consumer group* concurrently consuming the same partition.

To be clear, I'm genuinely interested in understanding if I correctly get
how Kafka consumers works.
Comments and suggestions are welcome :)

Best regards,
Vincenzo

On Wed, Nov 22, 2017 at 11:15 PM, Matthias J. Sax 
wrote:

> I KafkaConsumer itself should be use single threaded. If you want to
> parallelize processing, each thread should have it's own KafkaConsumer
> instance and all consumers should use the same `group.id` in their
> configuration. Load will be shared over all running consumer
> automatically for this case.
>
>
> -Matthias
>
> On 11/22/17 12:22 PM, cours.syst...@gmail.com wrote:
> > I am testing a KafkaConsumer. How can I modify it to process records in
> parallel?
> >
>
>


-- 
Vincenzo D'Amore


Re: auto.offset.reset in 0.11.0.2 Kafka Streams does not take effect.

2017-11-24 Thread Artur Mrozowski
Thank you Matthias!

On Thu, Nov 23, 2017 at 10:34 PM, Matthias J. Sax 
wrote:

> You might want to consider using the reset tool instead of just changing
> the application.id...
>
> https://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resetting-a-streams-application/
>
>
> -Matthias
>
> On 11/23/17 3:34 AM, Artur Mrozowski wrote:
> > Oh, I've got it. Need to reset the application id.
> >
> > On Thu, Nov 23, 2017 at 12:19 PM, Artur Mrozowski 
> wrote:
> >
> >> Hi,
> >> I am running a Kafka Streams application and want to read everything
> from
> >> the topic from the beginning.
> >>  So I renamed all the stores and set (ConsumerConfig.AUTO_OFFSET_
> >> RESET_CONFIG,"earliest")
> >> but it does not take effect. Nothing happens.
> >>
> >> As soon as I put new messages than the processing and output takes off
> for
> >> the latest messages but I'd like to consume everything there is on the
> >> topic.
> >>
> >> Not sure what I am missing here.
> >>
> >> /Artur
> >>
> >
>
>