Kafka performance on an ordinary machine

2016-11-08 Thread Majid Golshadi
Hello
We want to use Kafka in our production environment but we don't have any
information about what's the best server configuration and the best
benchmark can be achieve in our production environment (Based on our
hardware and VMs)
I'm really rookie in this area.
For getting to the best configuration for our production environment what
procedures do you propose?

suppose I want to run kafka on 3 machines with the following configuration:
   cpu: 2 core
   Ram: 4 gig
   HDD. 7500 rpm


Re: sliding ktable?

2016-11-08 Thread R Krishna
Yes, thanks.


Re: Cleanup partition offsets that exist for consumer groups but not in broker

2016-11-08 Thread Guozhang Wang
Hello Jeff,

Generally speaking ZK's stored offset paths should not be used as the
"source-of-truth" to determine which topic-partitions exist in the Kafka
cluster, but instead the broker topics path should be treated as the
"source-of-truth". More specifically, the common usage pattern would be:

1. check if the topic-partition exists from the ZK, or via the
MetadataRequest / Response from any of the brokers (note the former is
still the source of truth, while the latter is only caching a recent
snapshot of the metadata in ZK).

2. then try to fetch the offsets from ZK (Kafka, if you are on 0.8.2+
versions) with the known topic-partitions.


As for deleting the stored offsets in Kafka, there are indeed some other
use cases for this feature (e.g. you would not want to resume from the
committed offsets anymore but rather start from beginning or the LEO, etc),
and currently we do not yet have an admin request for deleting such
offsets, but one can do this via 1) starting a consumer group with the same
group id, 2) reset position to 0 or LEO, etc, 3) commit offsets so that the
offsets are effectively being reset.

Guozhang

On Thu, Nov 3, 2016 at 5:25 PM, Jeff Widman  wrote:

> We hit an error in some custom monitoring code for our Kafka cluster where
> the root cause was zookeeper was storing for some partition offsets for
> consumer groups, but those partitions didn't actually exist on the brokers.
>
> Apparently in the past, some colleagues needed to reset a stuck cluster
> caused by corrupted data. So they wiped out the data log files on disk for
> some topics, but didn't wipe the consumer offsets.
>
> In an ideal world this situation should never happen. However, things like
> this do happen in the real world.
>
> Couple of questions:
> 1) This is pretty easy to cleanup through the Zookeeper CLI, but how do you
> clean this up if we were instead storing offsets in Kafka?
>
> 2) From an operational perspective, I'm sure we're not the only ones to hit
> this, so I think there should be a simple command/script to clean this up
> that is a) packaged with Kafka, and b) documented. Does this currently
> exist?
>
> 3) I also think it'd be nice if Kafka automatically checked for this error
> case and logged a warning. I wouldn't want automatic cleaning, because if
> this situation occurs, something is screwy and I'd want to minimize what's
> changing while I tried to debug. Is this a reasonable request?
>
> Cheers,
> Jeff
>



-- 
-- Guozhang


Re: consumer client pause/resume/rebalance

2016-11-08 Thread Gwen Shapira
Yeah, we should mention that in the javadoc. Want to send a PR?

I like the "worse is better" philosophy - although we occasionally
choose complex implementation for simpler APIs (especially when it
comes to admin configurations).

On Tue, Nov 8, 2016 at 2:34 AM, Paul Mackles  wrote:
> Hi Gwen - Makes sense. The way you explain it actually reminds me a little of 
> the "worse is better" philosophy: https://www.jwz.org/doc/worse-is-better.html
>
>
> Perhaps a mention in the javadoc for pause() and/or ConsumerRebalanceListener 
> would be sufficient.
>
> 
> From: Gwen Shapira 
> Sent: Monday, November 07, 2016 3:34:39 PM
> To: Users
> Subject: Re: consumer client pause/resume/rebalance
>
> I think the current behavior is fairly reasonable. Following a
> rebalance the entire state of the consumer changes - you may get an
> entirely new set of partitions. A common use-case for pause is to
> allow a consumer to keep polling and avoid getting new events while it
> is retrying to process existing events - well, following a rebalance,
> it is possible that another consumer owns the partition, is already
> re-processing these events and the entire state needs to be reset.
>
> I usually recommend developers to treat rebalance as a restart (since
> you are getting a whole new set of partitions) and just follow
> whatever process you'd follow to set up after a restart. Since pauses
> don't survive restarts, I wouldn't expect them to survive a rebalance
> either.
>
> I hope this helps explain the behavior?
>
> On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles  wrote:
>> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions 
>> can unexpectedly become become unpaused during a rebalance. I also found an 
>> old thread from the mailing list which corroborates this behavior:
>>
>>
>> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>>
>>
>> While
>>  I can maintain the partition state myself, it seems like it would be a lot 
>> easier if this were either handled internally by the consumer API (i.e. 
>> pause the partitions that were previously paused before resuming) and/or 
>> make the partition state available to the RebalanceListener.
>>
>>
>> I did not find any existing tickets in JIRA related to this so I am 
>> wondering if this is a valid bug/enhancement or if someone found a decent 
>> workaround. All of the consumer API examples that I have found do not appear 
>> to handle this scenario.
>>
>>
>> Here is the code snippet from he client I have been working on:
>>
>>
>> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>>
>> while (!isWritable()) {
>>   // WARNING: if there is a rebalance, this call may return some records!!!
>>   consumer.poll(0);
>>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
>> }
>>
>> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>>
>>
>> Thanks,
>>
>> Paul
>>
>>
>>
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

My two cents:

Changelog topics are compacted topics, thus they do not have a
retention time (there is an exception for windowed KTable changlog
topics that are compacted and do have a retention time though).

However, I do not understand how changing retention time should fix
the issue. If your list of values grows and exceed max.message.byte
you will need to increase this parameter (or shrink you value).

Besides this, Eno's answer is the way to go. In order to figure out
internal topic names, you can use KafkaStreams#toString().


- -Matthias



On 11/8/16 11:14 AM, Eno Thereska wrote:
> Hi Sachin,
> 
> One option right now would be to precreate all internal topics in
> Kafka, and only after that start the Kafka Streams application.
> This would require you knowing the internal name of the topics (in
> this case you probably already know it, but I agree that in general
> this is a bit cumbersome).
> 
> Eno
> 
>> On 8 Nov 2016, at 18:10, Sachin Mittal 
>> wrote:
>> 
>> Per message payload size. The basic question is how can I control
>> the internal change log topics parameters so as to avoid these
>> errors.
>> 
>> 
>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
>> wrote:
>> 
>>> Are you talking about total messages and therefore size or per
>>> message payload size.
>>> 
>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
>>>  wrote:
>>> 
 Message size itself increases over the time.
 
 Message is something like key=[list on objects]
 
 This increases with time and then at a point kafka is not
 able to add any message to its topic because message size is
 greater than max.message.bytes. Since this is an internal
 topic based off a table I don't know how can I control this
 topic.
 
 If I can set some retention.ms for this topic then I can
 purge old messages thereby ensuring that message size stays
 within limit.
 
 Thanks Sachin
 
 
 
 On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
  wrote:
 
> Hi Sachin,
> 
> Could you clarify what you mean by "message size
> increases"? Are
>>> messages
> going to the changelog topic increasing in size? Or is the
> changelog
 topic
> getting full?
> 
> Thanks Eno
> 
>> On 8 Nov 2016, at 16:49, Sachin Mittal
>>  wrote:
>> 
>> Hi, We are using aggregation by key on a kstream to
>> create a ktable. As I read from 
>> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams%3A+Internal+Data+Management
>> it creates an internal changelog topic.
>> 
>> However over the time the streaming application is run
>> message size increases and it starts throwing
>> max.message.bytes exception.
>> 
>> Is there a way to control the retention.ms time for
>> internal
>>> changelog
>> topics so that messages are purged before they exceed
>> this size.
>> 
>> If not is there a way to control or avoid such an error.
>> 
>> Thanks Sachin
> 
> 
 
>>> 
>>> 
>>> 
>>> -- Radha Krishna, Proddaturi 253-234-5657
>>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0
S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U
PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck
pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix
8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7
xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk
IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+
5HjW76lHmYRUtdFl+RTlTxNUy/yfjnIXzLjQqHEnzcIPdnJY2lM6iUj94JPzFMUE
nY6z68PoXdKZw2VkkkiB7bnyaH1wRFD+AZKQH8ZoH2axYExg+MxJk+Fhcd+E2yU/
TL8b6lEcvwHOUU13H0ztSBUIJsjdh8aLVpSTvVtClDGKJJpueNznsbxf4TiVGoOm
INFNIJFfnZ2c9rOH8AGJHkdIjkJaAB8DbxP4pYoNTPboCjeFFe/B3dBUlxLkWiDq
Ny16O/mM8+6ydEG8ZzcA
=+92S
-END PGP SIGNATURE-


Re: sliding ktable?

2016-11-08 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Yes and no.

Kafka allows you to set a retention time for compacted topics, too.
Thus, if a key does not get an update for this retention time, it will
be deleted, too.

See here for details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+c
ompaction+and+deletion+to+co-exist

- -Matthias


On 11/7/16 11:44 PM, R Krishna wrote:
> There is a problem with tombstoning old entries based on a new
> entry, that, the keys which have no new entries will remain there
> forever.
> 
> On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax
>  wrote:
> 
> John,
> 
> your thinking is on the right track!
> 
> About infinitely growing KTable: It seems you are extending each
> lane with a list of all txnId -- so your view needs infinite memory
> as you expend your values... A quick fix might be, to delete older
> txnID for this list, each time you update the list (as you
> mentioned you only need data for the last two weeks -- you might
> need to add a timestamp for each txnID in the list to do the
> pruning each time you append or lookup the list).
> 
 Ideally if the topic is set to two weeks retention, then once
 an item is 'popped off' I would like to do an aggregate
 subtraction for it's value.  But I don't think this is how
 kafka works.  Is this possible?  Any other
 feedback/suggestion?Perhaps a better approach?
> 
> There is no Kafka support for this. You would need to go with the 
> suggest as describe above. The only "delete" mechanism Kafka offers
> is for compacted topics via tombstone message (ie, message with 
>  format; value == null). However, tombstones do delete
> the whole record with this key, thus I doubt they are useful for
> your case.
> 
> However, reading through your email, I am wondering why you do
> need the all old txnIds. You mentioned that you want to get the
> previous txnId for each duplicate (and you example results verifies
> this). Thus, it would be sufficient to only store the latest tnxId
> for each "lane" IMHO. Furhtermore, for this deduplication it seems
> sufficient to only use a KTable without a join.
> 
> The ideas would the as follows: You consumer you streams as a 
> changelog (ie, KTable). For each record, you check if there is an 
> entry in the view. If not, just put the record itself as result 
> because there is no duplicate. If you do find an entry, the
> current record is a duplicate of the record found. The record
> found, does contain it's txnId, you so can use this as "previous
> txnId". As result, you store the current record. You data format
> would be like  (for input) and 
>  (for output.
> 
> You stream and view would be like:
> 
> {'c',('03','11/07/2016')} plus state: EMPTY
> 
> => {'c',('03','11/07/2016',''}// this is output and state
> update at the same time
> 
> 
> 
> {'c',('09','11/07/2016')} plus state:
> {'c',('03','11/07/2016',null}
> 
> => {'c',('09','11/07/2016','03')} // this is output and state 
> update at the same time
> 
> 
> 
> {'c',('11','11/08/2016')} plus state:
> {'c',('09','11/07/2016','03')}
> 
> => {'c',('11','11/08/2016','09')} // this is output and state 
> update at the same time
> 
> 
> -Matthias
> 
> On 11/7/16 8:22 AM, John Hayles wrote:
 Thanks for the reply.  I really appreciate the insight.
 Again newbie here.  I want to expand on what I am struggling
 with.  It may be that I just need to get my mind thinking
 more in a streaming mode.  Please let me know you thoughts.
 Just having problem ‘getting it’ on my own.
 
 
 
 Below is a simple topic I want to identify where the 'lane' 
 duplicates, and when it does get the 'txnId' of the
 duplicate record.  The txnId is distinct and will never be
 duplicate.  The lane will seldom have a duplicate.
 
 
 
 
 
 Topic payload {txnId,lane,txnDate}  Notice lane 'c' is
 dulplicated 3 times.
 
 
 
 {'01','wfasd','11/07/2016'}
 
 {'02','bas','11/07/2016'}
 
 {'03','c','11/07/2016'}
 
 {'04','xxwq','11/07/2016'}
 
 {'05','dasf','11/07/2016'}
 
 {'06','drdd','11/07/2016'}
 
 {'07','tasd','11/07/2016'}
 
 {'08','ywq','11/07/2016'}
 
 {'09','c','11/07/2016'}
 
 {'10','jda','11/07/2016'}
 
 {'11','c','11/08/2016'}
 
 {'12','ozs','11/09/2016'}
 
 . . .
 
 Note txnId and lane keep getting more distinct values.
 
 
 
 
 
 My thought is to join the data to itself,  one as kstream the
 other as ktable for lookups.
 
 
 
 kstream as
 
 
 
 {lane:(txnId,txnDate)}
 
 
 
 so I visualize like ...
 
 
 
 ('wfasd':('01','11/07/2016')),
 
 ('bas'  :('02','11/07/2016')),
 
 ('c':('03','11/07/2016')), ...
 
 
 
 The ktable (lookup table) is an aggregate view I built to
 hold historic data by lan

kafka topics failed UnknownHostException

2016-11-08 Thread M.Z.
Hi, Guys. Any idea about this error?

I installed and add Kafka service through Cloudera Manager, which looks
fine.

But when I try to test it with list topics, it gave error like:

./kafka-topics.sh --list --zookeeper localhost:2181



java.net.UnknownHostException: opt

at java.net.AbstractPlainSocketImpl.connect(AbstractP
lainSocketImpl.java:178)



I dont where the host 'opt' come from?


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin,

One option right now would be to precreate all internal topics in Kafka, and 
only after that start the Kafka Streams application. This would require you 
knowing the internal name of the topics (in this case you probably already know 
it, but I agree that in general this is a bit cumbersome).

Eno

> On 8 Nov 2016, at 18:10, Sachin Mittal  wrote:
> 
> Per message payload size. The basic question is how can I control the
> internal change log topics parameters so as to avoid these errors.
> 
> 
> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna  wrote:
> 
>> Are you talking about total messages and therefore size or per message
>> payload size.
>> 
>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:
>> 
>>> Message size itself increases over the time.
>>> 
>>> Message is something like
>>> key=[list on objects]
>>> 
>>> This increases with time and then at a point kafka is not able to add any
>>> message to its topic because message size is greater than
>>> max.message.bytes.
>>> Since this is an internal topic based off a table I don't know how can I
>>> control this topic.
>>> 
>>> If I can set some retention.ms for this topic then I can purge old
>>> messages
>>> thereby ensuring that message size stays within limit.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> 
>>> 
>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
>>> wrote:
>>> 
 Hi Sachin,
 
 Could you clarify what you mean by "message size increases"? Are
>> messages
 going to the changelog topic increasing in size? Or is the changelog
>>> topic
 getting full?
 
 Thanks
 Eno
 
> On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> 
> Hi,
> We are using aggregation by key on a kstream to create a ktable.
> As I read from
> https://cwiki.apache.org/confluence/display/KAFKA/
 Kafka+Streams%3A+Internal+Data+Management
> it creates an internal changelog topic.
> 
> However over the time the streaming application is run message size
> increases and it starts throwing max.message.bytes exception.
> 
> Is there a way to control the retention.ms time for internal
>> changelog
> topics so that messages are purged before they exceed this size.
> 
> If not is there a way to control or avoid such an error.
> 
> Thanks
> Sachin
 
 
>>> 
>> 
>> 
>> 
>> --
>> Radha Krishna, Proddaturi
>> 253-234-5657
>> 



Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Per message payload size. The basic question is how can I control the
internal change log topics parameters so as to avoid these errors.


On Tue, Nov 8, 2016 at 11:37 PM, R Krishna  wrote:

> Are you talking about total messages and therefore size or per message
> payload size.
>
> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:
>
> > Message size itself increases over the time.
> >
> > Message is something like
> > key=[list on objects]
> >
> > This increases with time and then at a point kafka is not able to add any
> > message to its topic because message size is greater than
> > max.message.bytes.
> > Since this is an internal topic based off a table I don't know how can I
> > control this topic.
> >
> > If I can set some retention.ms for this topic then I can purge old
> > messages
> > thereby ensuring that message size stays within limit.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
> > wrote:
> >
> > > Hi Sachin,
> > >
> > > Could you clarify what you mean by "message size increases"? Are
> messages
> > > going to the changelog topic increasing in size? Or is the changelog
> > topic
> > > getting full?
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> > > >
> > > > Hi,
> > > > We are using aggregation by key on a kstream to create a ktable.
> > > > As I read from
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > Kafka+Streams%3A+Internal+Data+Management
> > > > it creates an internal changelog topic.
> > > >
> > > > However over the time the streaming application is run message size
> > > > increases and it starts throwing max.message.bytes exception.
> > > >
> > > > Is there a way to control the retention.ms time for internal
> changelog
> > > > topics so that messages are purged before they exceed this size.
> > > >
> > > > If not is there a way to control or avoid such an error.
> > > >
> > > > Thanks
> > > > Sachin
> > >
> > >
> >
>
>
>
> --
> Radha Krishna, Proddaturi
> 253-234-5657
>


Kafka ACL Groups/Wildcards

2016-11-08 Thread Bryan Baugher
Hi everyone,

I've been trying out Kafka security and was curious if there were
plans/issues to add wildcards in resources or user group support to Kafka
ACLs? If they are already implemented point me to the doc on how to use it?

 Specifically it would be nice to give group's of users access to things
instead of having to call out each specific user. Additionally it would be
nice to be able to use wildcards for topics and consumer groups.


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread R Krishna
Are you talking about total messages and therefore size or per message
payload size.

On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:

> Message size itself increases over the time.
>
> Message is something like
> key=[list on objects]
>
> This increases with time and then at a point kafka is not able to add any
> message to its topic because message size is greater than
> max.message.bytes.
> Since this is an internal topic based off a table I don't know how can I
> control this topic.
>
> If I can set some retention.ms for this topic then I can purge old
> messages
> thereby ensuring that message size stays within limit.
>
> Thanks
> Sachin
>
>
>
> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
> wrote:
>
> > Hi Sachin,
> >
> > Could you clarify what you mean by "message size increases"? Are messages
> > going to the changelog topic increasing in size? Or is the changelog
> topic
> > getting full?
> >
> > Thanks
> > Eno
> >
> > > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> > >
> > > Hi,
> > > We are using aggregation by key on a kstream to create a ktable.
> > > As I read from
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams%3A+Internal+Data+Management
> > > it creates an internal changelog topic.
> > >
> > > However over the time the streaming application is run message size
> > > increases and it starts throwing max.message.bytes exception.
> > >
> > > Is there a way to control the retention.ms time for internal changelog
> > > topics so that messages are purged before they exceed this size.
> > >
> > > If not is there a way to control or avoid such an error.
> > >
> > > Thanks
> > > Sachin
> >
> >
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Message size itself increases over the time.

Message is something like
key=[list on objects]

This increases with time and then at a point kafka is not able to add any
message to its topic because message size is greater than max.message.bytes.
Since this is an internal topic based off a table I don't know how can I
control this topic.

If I can set some retention.ms for this topic then I can purge old messages
thereby ensuring that message size stays within limit.

Thanks
Sachin



On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
wrote:

> Hi Sachin,
>
> Could you clarify what you mean by "message size increases"? Are messages
> going to the changelog topic increasing in size? Or is the changelog topic
> getting full?
>
> Thanks
> Eno
>
> > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> >
> > Hi,
> > We are using aggregation by key on a kstream to create a ktable.
> > As I read from
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams%3A+Internal+Data+Management
> > it creates an internal changelog topic.
> >
> > However over the time the streaming application is run message size
> > increases and it starts throwing max.message.bytes exception.
> >
> > Is there a way to control the retention.ms time for internal changelog
> > topics so that messages are purged before they exceed this size.
> >
> > If not is there a way to control or avoid such an error.
> >
> > Thanks
> > Sachin
>
>


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin,

Could you clarify what you mean by "message size increases"? Are messages going 
to the changelog topic increasing in size? Or is the changelog topic getting 
full? 

Thanks
Eno

> On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> 
> Hi,
> We are using aggregation by key on a kstream to create a ktable.
> As I read from
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management
> it creates an internal changelog topic.
> 
> However over the time the streaming application is run message size
> increases and it starts throwing max.message.bytes exception.
> 
> Is there a way to control the retention.ms time for internal changelog
> topics so that messages are purged before they exceed this size.
> 
> If not is there a way to control or avoid such an error.
> 
> Thanks
> Sachin



Re: Protecting kafka-producer against unavailability of all brokers (request.timeout.ms)

2016-11-08 Thread sutambe
I agree that accumulator timeout should be independent from the other two you 
mentioned. We at LinkedIn have come up with a solution and I'll create a KIP 
for it soon. In essence, we want batch.expiry.ms configuration that directly 
specifies accumulator timeout separately from request.timeout. Proliferation of 
request.timeout "up" the stack has been painful. There are a number of nuances 
to it and they will be discussed in the KIP. Stay tuned. 

-Sumant

Sent from my iPad

> On Nov 8, 2016, at 8:23 AM, Lukasz Druminski 
>  wrote:
> 
> Hi,
> 
> We are using kafka-producer 0.8.2 on our production. We configured it with
> retries to Integer.MAX_VALUE and buffer.memory to 1GB.
> Thanks to this setup we are protected from unavailability of all brokers
> for around one hour (taking into account our production traffic).
> For example, when all brokers from a single DC/zone are down,
> kafka-producer buffers all incoming messages in its accumulator until full.
> When brokers are available again, the producer sends all the buffered
> messages to kafka. Thanks to this we have some time for recovery and don't
> loose messages at all.
> 
> Now, we would like to migrate to the newest kafka-producer 0.10.1 but we
> have a problem with preserving described behaviour because of changes
> introduced to producer library:
> 
> - proposal about adding request timeout to NetworkClient
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
> - producer record can stay in RecordAccumulator forever if leader is not
> available https://issues.apache.org/jira/browse/KAFKA-1788
> - add a request timeout to NetworkClient
> https://issues.apache.org/jira/browse/KAFKA-2120
> 
> These changes provide request.timeout.ms parameter which is used in:
> 
> 1. actual network RTT
> 2. server replication time
> 3. new mechanism for aborting expired batches
> 
> When brokers are unavailable for more than request.timeout.ms then
> kafka-producer starts dropping batches from accumulator with a
> TimeoutException in a callback with a message:
> 
>  "Batch containing " + recordCount + " record(s) expired due to timeout
> while requesting metadata from brokers for " + topicPartition
> 
> As a possible solution, to protect against unavailability of all brokers,
> in the newest kafka-producer:
> 
> - I could increase request.timeout.ms to one hour and batches would be
> dropped after that time but this value is not reasonable for (1) and (2)
> - I could catch TimeoutException and send corresponding message to
> kafka-producer again but then I don’t have guarantee that there will be
> free space in accumulator
> 
> In my opinion timeout for (3) should be independent from (1) and (2), or
> dropping expired batches should be an optional feature.
> What do you think about this issue? Do you have any suggestion/solution for
> this use case?
> 
> Best regards,
> Luke Druminski


[kafka] Errors during failover

2016-11-08 Thread Frederic Girard
Hello,

We're planning to use kafka (0.10.1), so we tested it. I've done some fail-over 
tests, with unexpected results.

We get 3 servers, each one is running a kafka broker. We created 3 messages 
queues (MSG01, MSG02, MSG03).
Each message queue has only 1 partition, and has a replication factor of 3.

Topic:MSG01 PartitionCount:1ReplicationFactor:3 Configs:
Topic: MSG01Partition: 0Leader: 1   Replicas: 1,0,2 Isr: 
1,0,2
Topic:MSG02 PartitionCount:1ReplicationFactor:3 Configs:
Topic: MSG02Partition: 0Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0
Topic:MSG03 PartitionCount:1ReplicationFactor:3 Configs:
Topic: MSG03Partition: 0Leader: 1   Replicas: 1,2,0 Isr: 
1,2,0


Then we start to send messages to kafka and receive them using a jmeter script 
(~200 messages sent per second).

*  11:06:22 : PB02 kafka server is killed. (SIGTERM)
*  11:08:10 : PB02 kafka server is restarted 2 minutes later.
*  11:09:23 : 204 errors
*  11:13:35 : PB02 kafka server is killed (SIGTERM): 2 error
*  11:30:27 : PB02 kafka server is restarted 17 minutes later.
*  11:34:23 : 202 errors
*  11:47:20 : PB01 kafka server is killed. (SIGTERM)
*  11:52:05 : PB01 kafka server is restarted 5 minutes later.
*  11:56:02 : 15 errors
*  11:56:02 : PB02 kafka server is killed. (SIGTERM)
*  12:00:02 : PB02 kafka server is restarted 4 minutes later.
*  12:02:28 : 207 errors

When we shutdown a broker then restart it, nothing happens (maybe a few 
errors). But some minutes later, we get a lot of errors.
I've done this test many times, it always works this way.

When these errors happen, here's the log we get :

[2016-10-20 11:09:23,833] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions [MSG01,0] (kafka.server.ReplicaFetcherManager)
[2016-10-20 11:09:23,833] INFO Truncating log MSG01-0 to offset 30117. 
(kafka.log.Log)
[2016-10-20 11:09:23,837] INFO [ReplicaFetcherManager on broker 2] Added 
fetcher for partitions List([[MSG01,0], initOffset 30117 to broker 
BrokerEndPoint(1,perf-bench-02,9092)] ) (kafka.server.ReplicaFetcherManager)
[2016-10-20 11:09:23,838] INFO [ReplicaFetcherThread-0-1], Starting  
(kafka.server.ReplicaFetcherThread)
[2016-10-20 11:09:23,839] INFO [ReplicaFetcherThread-0-0], Shutting down 
(kafka.server.ReplicaFetcherThread)
[2016-10-20 11:09:23,840] INFO [ReplicaFetcherThread-0-0], Stopped  
(kafka.server.ReplicaFetcherThread)
[2016-10-20 11:09:23,840] INFO [ReplicaFetcherThread-0-0], Shutdown completed 
(kafka.server.ReplicaFetcherThread)
[2016-10-20 11:09:23,845] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions [MSG03,0] (kafka.server.ReplicaFetcherManager)
[2016-10-20 11:09:23,845] INFO Truncating log MSG03-0 to offset 29935. 
(kafka.log.Log)
[2016-10-20 11:09:23,847] INFO [ReplicaFetcherManager on broker 2] Added 
fetcher for partitions List([[MSG03,0], initOffset 29935 to broker 
BrokerEndPoint(1,perf-bench-02,9092)] ) (kafka.server.ReplicaFetcherManager)
[2016-10-20 11:09:23,851] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions [MSG02,0] (kafka.server.ReplicaFetcherManager)
[2016-10-20 11:09:23,851] INFO Truncating log MSG02-0 to offset 30041. 
(kafka.log.Log)
[2016-10-20 11:09:23,852] INFO [ReplicaFetcherManager on broker 2] Added 
fetcher for partitions List([[MSG02,0], initOffset 30041 to broker 
BrokerEndPoint(1,perf-bench-02,9092)] ) (kafka.server.ReplicaFetcherManager)

It seems there's some kind of reorganization of the topics/partition, with 
offset truncated, could that be the cause of these many errors ?


Regards,
Frederic Girard.

---
Ce message et les pièces jointes associées sont confidentiels et à l'attention 
exclusive des destinataires. Si vous avez reçu ce message par erreur, merci 
d'avertir l'administrateur de la messagerie: postmas...@lotsys.com
 
This email and files transmitted with it are confidential and intended solely 
for the use of the individual to whom they are addressed. If you have received 
this email in error, please notify the system manager: postmas...@lotsys.com
---



Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Hi,
We are using aggregation by key on a kstream to create a ktable.
As I read from
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management
it creates an internal changelog topic.

However over the time the streaming application is run message size
increases and it starts throwing max.message.bytes exception.

Is there a way to control the retention.ms time for internal changelog
topics so that messages are purged before they exceed this size.

If not is there a way to control or avoid such an error.

Thanks
Sachin


Protecting kafka-producer against unavailability of all brokers (request.timeout.ms)

2016-11-08 Thread Lukasz Druminski
Hi,

We are using kafka-producer 0.8.2 on our production. We configured it with
retries to Integer.MAX_VALUE and buffer.memory to 1GB.
Thanks to this setup we are protected from unavailability of all brokers
for around one hour (taking into account our production traffic).
For example, when all brokers from a single DC/zone are down,
kafka-producer buffers all incoming messages in its accumulator until full.
When brokers are available again, the producer sends all the buffered
messages to kafka. Thanks to this we have some time for recovery and don't
loose messages at all.

Now, we would like to migrate to the newest kafka-producer 0.10.1 but we
have a problem with preserving described behaviour because of changes
introduced to producer library:

- proposal about adding request timeout to NetworkClient
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
- producer record can stay in RecordAccumulator forever if leader is not
available https://issues.apache.org/jira/browse/KAFKA-1788
- add a request timeout to NetworkClient
https://issues.apache.org/jira/browse/KAFKA-2120

These changes provide request.timeout.ms parameter which is used in:

1. actual network RTT
2. server replication time
3. new mechanism for aborting expired batches

When brokers are unavailable for more than request.timeout.ms then
kafka-producer starts dropping batches from accumulator with a
TimeoutException in a callback with a message:

  "Batch containing " + recordCount + " record(s) expired due to timeout
while requesting metadata from brokers for " + topicPartition

As a possible solution, to protect against unavailability of all brokers,
in the newest kafka-producer:

- I could increase request.timeout.ms to one hour and batches would be
dropped after that time but this value is not reasonable for (1) and (2)
- I could catch TimeoutException and send corresponding message to
kafka-producer again but then I don’t have guarantee that there will be
free space in accumulator

In my opinion timeout for (3) should be independent from (1) and (2), or
dropping expired batches should be an optional feature.
What do you think about this issue? Do you have any suggestion/solution for
this use case?

Best regards,
Luke Druminski


Re: Understanding zookeper and kafka server failures

2016-11-08 Thread Karolis Pocius
It depends on the size and load of your cluster. Zookeeper is very I/O 
sensitive, so at least you have to make sure it doesn't share disk with 
the OS or Kafka.


I assume you've read the documentation, but you might want to have a 
look at https://kafka.apache.org/documentation.html#zkops again, it 
provides reasoning behind why you shouldn't run zookeeper and kafka 
together.



On 2016.11.08 17:47, Sachin Mittal wrote:

Hi,
Thanks for the reply. From one obvious reason that is server crashes then
both zookeeper and broker crashes, is there any other reason why we should
not run broker and zookeeper on same server.

If chances of server crash are extremely low can can be brought back up
quickly, then can we keep both on same server.

Thanks
Sachin



On Tue, Nov 8, 2016 at 8:59 PM, Karolis Pocius  wrote:


The question is what happens if one of the zookeeper crashes. Will the

broker on that node also will crash?


If 1/3 zookeeper nodes crashes, the other two will take over. Kafka broker
will not crash. However, you should not run zookeeper and kafka on the same
server in production.


What happens if broker crashes? I suppose other two brokers will take the
load.


Yes, the other two will take the load, but it also depends on the number
of partitions and how they are distributed across cluster.

Now what happens if 2 zookeeper nodes crashes.

Or if 2 brokers crashes. Will my cluster be still working in this case.


If 2 zookeeper nodes crash, there's no longer a majority and your cluster
will be down. If 2 kafka brokers crash and you have replication factor 3
you should be OK, but as in previous answer - it depends on the number of
partitions and how they're spread in the cluster.

So basically what is the difference between zookeeper failure and

server/broker failure.


Again, you shouldn't run zookeeper and kafka on the same server in
production. So the difference is that while kafka is responsible for data,
zookeeper is coordinating tasks between kafka nodes.



On 2016.11.08 17:12, Sachin Mittal wrote:


Hi,
We have following setup.
1. Three instances of zookeeper on three machines.
2. Three instances of kafka server on same three machines.
3. All the topics have replication factor 3.

So when we create a topic on any node, i see that it gets replicated on
all
three instances.
I also see that topic data is getting replicated to all three nodes.

The data to main topic is written by three producers to which all three zk
nodes config is provided in connect string.

This is all working fine.

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?

What happens if broker crashes? I suppose other two brokers will take the
load.

Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.

So basically what is the difference between zookeeper failure and
server/broker failure.

Thanks
Sachin






Re: Understanding zookeper and kafka server failures

2016-11-08 Thread Sachin Mittal
Hi,
Thanks for the reply. From one obvious reason that is server crashes then
both zookeeper and broker crashes, is there any other reason why we should
not run broker and zookeeper on same server.

If chances of server crash are extremely low can can be brought back up
quickly, then can we keep both on same server.

Thanks
Sachin



On Tue, Nov 8, 2016 at 8:59 PM, Karolis Pocius  wrote:

> The question is what happens if one of the zookeeper crashes. Will the
>> broker on that node also will crash?
>>
> If 1/3 zookeeper nodes crashes, the other two will take over. Kafka broker
> will not crash. However, you should not run zookeeper and kafka on the same
> server in production.
>
>> What happens if broker crashes? I suppose other two brokers will take the
>> load.
>>
> Yes, the other two will take the load, but it also depends on the number
> of partitions and how they are distributed across cluster.
>
> Now what happens if 2 zookeeper nodes crashes.
>> Or if 2 brokers crashes. Will my cluster be still working in this case.
>>
> If 2 zookeeper nodes crash, there's no longer a majority and your cluster
> will be down. If 2 kafka brokers crash and you have replication factor 3
> you should be OK, but as in previous answer - it depends on the number of
> partitions and how they're spread in the cluster.
>
> So basically what is the difference between zookeeper failure and
>> server/broker failure.
>>
> Again, you shouldn't run zookeeper and kafka on the same server in
> production. So the difference is that while kafka is responsible for data,
> zookeeper is coordinating tasks between kafka nodes.
>
>
>
> On 2016.11.08 17:12, Sachin Mittal wrote:
>
>> Hi,
>> We have following setup.
>> 1. Three instances of zookeeper on three machines.
>> 2. Three instances of kafka server on same three machines.
>> 3. All the topics have replication factor 3.
>>
>> So when we create a topic on any node, i see that it gets replicated on
>> all
>> three instances.
>> I also see that topic data is getting replicated to all three nodes.
>>
>> The data to main topic is written by three producers to which all three zk
>> nodes config is provided in connect string.
>>
>> This is all working fine.
>>
>> The question is what happens if one of the zookeeper crashes. Will the
>> broker on that node also will crash?
>>
>> What happens if broker crashes? I suppose other two brokers will take the
>> load.
>>
>> Now what happens if 2 zookeeper nodes crashes.
>> Or if 2 brokers crashes. Will my cluster be still working in this case.
>>
>> So basically what is the difference between zookeeper failure and
>> server/broker failure.
>>
>> Thanks
>> Sachin
>>
>>
>


Re: Understanding zookeper and kafka server failures

2016-11-08 Thread Karolis Pocius

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?
If 1/3 zookeeper nodes crashes, the other two will take over. Kafka 
broker will not crash. However, you should not run zookeeper and kafka 
on the same server in production.

What happens if broker crashes? I suppose other two brokers will take the
load.
Yes, the other two will take the load, but it also depends on the number 
of partitions and how they are distributed across cluster.



Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.
If 2 zookeeper nodes crash, there's no longer a majority and your 
cluster will be down. If 2 kafka brokers crash and you have replication 
factor 3 you should be OK, but as in previous answer - it depends on the 
number of partitions and how they're spread in the cluster.



So basically what is the difference between zookeeper failure and
server/broker failure.
Again, you shouldn't run zookeeper and kafka on the same server in 
production. So the difference is that while kafka is responsible for 
data, zookeeper is coordinating tasks between kafka nodes.



On 2016.11.08 17:12, Sachin Mittal wrote:

Hi,
We have following setup.
1. Three instances of zookeeper on three machines.
2. Three instances of kafka server on same three machines.
3. All the topics have replication factor 3.

So when we create a topic on any node, i see that it gets replicated on all
three instances.
I also see that topic data is getting replicated to all three nodes.

The data to main topic is written by three producers to which all three zk
nodes config is provided in connect string.

This is all working fine.

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?

What happens if broker crashes? I suppose other two brokers will take the
load.

Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.

So basically what is the difference between zookeeper failure and
server/broker failure.

Thanks
Sachin





Understanding zookeper and kafka server failures

2016-11-08 Thread Sachin Mittal
Hi,
We have following setup.
1. Three instances of zookeeper on three machines.
2. Three instances of kafka server on same three machines.
3. All the topics have replication factor 3.

So when we create a topic on any node, i see that it gets replicated on all
three instances.
I also see that topic data is getting replicated to all three nodes.

The data to main topic is written by three producers to which all three zk
nodes config is provided in connect string.

This is all working fine.

The question is what happens if one of the zookeeper crashes. Will the
broker on that node also will crash?

What happens if broker crashes? I suppose other two brokers will take the
load.

Now what happens if 2 zookeeper nodes crashes.
Or if 2 brokers crashes. Will my cluster be still working in this case.

So basically what is the difference between zookeeper failure and
server/broker failure.

Thanks
Sachin


Re: consumer client pause/resume/rebalance

2016-11-08 Thread Paul Mackles
Hi Gwen - Makes sense. The way you explain it actually reminds me a little of 
the "worse is better" philosophy: https://www.jwz.org/doc/worse-is-better.html


Perhaps a mention in the javadoc for pause() and/or ConsumerRebalanceListener 
would be sufficient.


From: Gwen Shapira 
Sent: Monday, November 07, 2016 3:34:39 PM
To: Users
Subject: Re: consumer client pause/resume/rebalance

I think the current behavior is fairly reasonable. Following a
rebalance the entire state of the consumer changes - you may get an
entirely new set of partitions. A common use-case for pause is to
allow a consumer to keep polling and avoid getting new events while it
is retrying to process existing events - well, following a rebalance,
it is possible that another consumer owns the partition, is already
re-processing these events and the entire state needs to be reset.

I usually recommend developers to treat rebalance as a restart (since
you are getting a whole new set of partitions) and just follow
whatever process you'd follow to set up after a restart. Since pauses
don't survive restarts, I wouldn't expect them to survive a rebalance
either.

I hope this helps explain the behavior?

On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles  wrote:
> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions 
> can unexpectedly become become unpaused during a rebalance. I also found an 
> old thread from the mailing list which corroborates this behavior:
>
>
> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>
>
> While
>  I can maintain the partition state myself, it seems like it would be a lot 
> easier if this were either handled internally by the consumer API (i.e. pause 
> the partitions that were previously paused before resuming) and/or make the 
> partition state available to the RebalanceListener.
>
>
> I did not find any existing tickets in JIRA related to this so I am wondering 
> if this is a valid bug/enhancement or if someone found a decent workaround. 
> All of the consumer API examples that I have found do not appear to handle 
> this scenario.
>
>
> Here is the code snippet from he client I have been working on:
>
>
> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>
> while (!isWritable()) {
>   // WARNING: if there is a rebalance, this call may return some records!!!
>   consumer.poll(0);
>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
> }
>
> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>
>
> Thanks,
>
> Paul
>
>
>



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: is there a way to make sure two consumers receive the same message from the broker?

2016-11-08 Thread kant kodali
:)

On Tue, Nov 8, 2016 at 1:37 AM, AmirHossein Roozbahany 
wrote:

> Excuse me this part was non-sense: if the latest update to a document in
> es always win in Cassandra's LWW, they will "eventually" "converge".
> 
> From: AmirHossein Roozbahany
> Sent: ‎11/‎8/‎2016 8:16 AM
> To: users@kafka.apache.org
> Subject: RE: is there a way to make sure two consumers receive the same
> message from the broker?
>
> Generally Cassandra itself is not consistent enough, even with quorum
> read-writes, say one of the writes fail, the nodes who received the data
> won't roll back and it might lead to dirty reads which in turn makes roll
> back logic tricky (if not impossible). You can use linearizable writes but
> if you want to use them all the time, why bother using Cassandra!?
>
> The important thing about Cassandra is that all of the nodes will
> eventually have the same data(after anti-entropy or read-repair). they are
> "convergent", they will "eventually" converge, and practically they
> converge pretty fast.
>
> I think what you might actually need is to make two databases
> convergent(not necessarily fully consistent at any given time) if the
> latest update to a document in es leads always win when Cassandra is doing
> es they will "eventually" "converge".
>
> Doing so is easy, as fast as I know es assigns a _version number to a
> document and increases it on every update, now if your use of in Cassandra
> insert statement as the "writetime". Now when Cassandra is doing read
> repair the record with higher writetime will win.
>
> Using es's document _version field is just one option, you can use
> something from you domain or kafka's offset or machine timestamp (not
> recommended at all).
>
> I hope it could help
> 
> From: kant kodali
> Sent: ‎11/‎7/‎2016 8:18 PM
> To: users@kafka.apache.org
> Subject: Re: is there a way to make sure two consumers receive the same
> message from the broker?
>
> Hi AmitHossein,
>
> I still don't see how that guarantees consistency at any given time. other
> words how do I know at time X the data in Cassandra and ES are the same.
>
> Thanks
>
>
> On Mon, Nov 7, 2016 at 3:26 AM, AmirHossein Roozbahany <
> diver...@outlook.com
> > wrote:
>
> > Hi
> >
> > Can you use elasticsearch _version field as cassandra's
> > writetime?(_version is strictly increasing, cassandra uses writetime for
> > applying LWW, so last write in elasticsearch will always win)
> >
> > It needs no transaction and makes databases convergent.
> >
> >
> > 
> > From: kant kodali 
> > Sent: Monday, November 7, 2016 3:08 AM
> > To: users@kafka.apache.org
> > Subject: Re: is there a way to make sure two consumers receive the same
> > message from the broker?
> >
> > Hi Hans,
> >
> > The two storages we use are Cassandra and Elastic search and they are on
> > the same datacenter for now.
> > The Programming Language we use is Java and OS would be Ubuntu or CentOS.
> > We get messages in JSON format so we insert into Elastic Search directly
> > and for Cassandra we transform JSON message into appropriate model so we
> > could insert into a Cassandra table.
> > The rate we currently get is about 100K/sec which is awesome but I am
> > pretty sure this will go down once when we implement 2PC or transactional
> > writes.
> >
> > Thanks,
> > kant
> >
>


RE: is there a way to make sure two consumers receive the same message from the broker?

2016-11-08 Thread AmirHossein Roozbahany
Excuse me this part was non-sense: if the latest update to a document in es 
always win in Cassandra's LWW, they will "eventually" "converge".

From: AmirHossein Roozbahany
Sent: ‎11/‎8/‎2016 8:16 AM
To: users@kafka.apache.org
Subject: RE: is there a way to make sure two consumers receive the same message 
from the broker?

Generally Cassandra itself is not consistent enough, even with quorum 
read-writes, say one of the writes fail, the nodes who received the data won't 
roll back and it might lead to dirty reads which in turn makes roll back logic 
tricky (if not impossible). You can use linearizable writes but if you want to 
use them all the time, why bother using Cassandra!?

The important thing about Cassandra is that all of the nodes will eventually 
have the same data(after anti-entropy or read-repair). they are "convergent", 
they will "eventually" converge, and practically they converge pretty fast.

I think what you might actually need is to make two databases convergent(not 
necessarily fully consistent at any given time) if the latest update to a 
document in es leads always win when Cassandra is doing es they will 
"eventually" "converge".

Doing so is easy, as fast as I know es assigns a _version number to a document 
and increases it on every update, now if your use of in Cassandra insert 
statement as the "writetime". Now when Cassandra is doing read repair the 
record with higher writetime will win.

Using es's document _version field is just one option, you can use something 
from you domain or kafka's offset or machine timestamp (not recommended at all).

I hope it could help

From: kant kodali
Sent: ‎11/‎7/‎2016 8:18 PM
To: users@kafka.apache.org
Subject: Re: is there a way to make sure two consumers receive the same message 
from the broker?

Hi AmitHossein,

I still don't see how that guarantees consistency at any given time. other
words how do I know at time X the data in Cassandra and ES are the same.

Thanks


On Mon, Nov 7, 2016 at 3:26 AM, AmirHossein Roozbahany  wrote:

> Hi
>
> Can you use elasticsearch _version field as cassandra's
> writetime?(_version is strictly increasing, cassandra uses writetime for
> applying LWW, so last write in elasticsearch will always win)
>
> It needs no transaction and makes databases convergent.
>
>
> 
> From: kant kodali 
> Sent: Monday, November 7, 2016 3:08 AM
> To: users@kafka.apache.org
> Subject: Re: is there a way to make sure two consumers receive the same
> message from the broker?
>
> Hi Hans,
>
> The two storages we use are Cassandra and Elastic search and they are on
> the same datacenter for now.
> The Programming Language we use is Java and OS would be Ubuntu or CentOS.
> We get messages in JSON format so we insert into Elastic Search directly
> and for Cassandra we transform JSON message into appropriate model so we
> could insert into a Cassandra table.
> The rate we currently get is about 100K/sec which is awesome but I am
> pretty sure this will go down once when we implement 2PC or transactional
> writes.
>
> Thanks,
> kant
>