Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity
With Kafka Streams you get those and atomicity via Exactly-once-Semantics. Michał On 21/07/17 14:51, Chris Richardson wrote: Hi, I like Kafka but I don't understand the claim that it can be used for Event Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html> and here <https://martinfowler.com/eaaDev/EventSourcing.html>) One part of the event sourcing is the ability to subscribe to events published by aggregates and clearly Kafka works well there. But the other part of Event Sourcing is "database" like functionality, which includes - findEventsByPrimaryKey() - needed to be able to reconstruct an aggregate from its events - the essence of event sourcing - Atomic updates - for updating aggregates - findEventsByPrimaryKey() - business logic - insertNewEvents()) in order to handle this kind of scenario. The approach we have taken is to implement event sourcing using a database and Kafka. For instance: see https://blog.eventuate.io/2016/10/06/eventuate-local-event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/ Chris -- Signature Michal Borowiecki <http://www.openbet.com/> *Senior Software Engineer L4* *T:*+44 208 742 1600 <https://signature.openbet/cgi-bin/signature.php#> *E:* michal.borowie...@openbet.com <https://signature.openbet/cgi-bin/signature.php#> *DL: * +44 203 249 8448 <https://signature.openbet/cgi-bin/signature.php#> *W:*www.openbet.com <https://signature.openbet/cgi-bin/signature.php#> ** ** <https://www.openbet.com/com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka compatibility matrix needed
Have you seen this: http://kafka.apache.org/documentation.html#upgrade Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.11.0 clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and newer clients. Hope that helps. Cheers, Michał On 18/07/17 08:17, Sachin Mittal wrote: Hi, I would like some help/information on what client versions are compatible with what broker versions in kafka. Some table like this would be good server client 0.80.9 0.10 0.11 0.8 yes ? ?? 0.9 ?yes ?? 0.10? ?yes? 0.11? ??yes So if question marks are filled it would be of great help. Reason I am asking is many times we need to use other libraries/frameworks to pull/push data from/into kafka and sometimes these support only a particular version of clients. Like right now I am trying to pull data from kafka via druid/tranquility and they have clients of version 0.8.x implemented but my broker is running 0.10.x. Also if such a table can be posted on kafka documentation page or github page that would be great. Thanks Sachin -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka Streams Usage Patterns
Hi all, Another pattern I think is worth adding is a sliding-windowed message reordering and de-duplicating processor. The outline I have in mind is based on this (just the timestamp would come from the record context - in this question the timestamp was in the body of the message): https://stackoverflow.com/a/44345374/7897191 Please let me know if you have a better design for this? Cheers, Michal On 27/05/17 21:16, Jay Kreps wrote: This is great! -Jay On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> wrote: Hi all, I've updated the wiki page with a draft pattern for consecutively growing time-windowed aggregations which was discussed some time ago on this mailing list. I'm yet to add the part that cleans up the stores using punctuations. Stay tuned. On a somewhat similar subject, I've been working to implement the following requirements: * transaction sums per customer session (simple, just extract non-expired session-windowed aggregates from a SessionStore using interactive queries) * global transaction sums for all _/currently active/_ customer sessions The second bit proved non-trivial, because session-windowed KTables (or any windowed KTables for that matter) don't notify downstream when a window expires. And I can't use punctuate until KIP-138 is implemented because stream time punctuation is no good in this case (records can stop coming), reliable system time punctuation would be needed. Below is how I implemented this, I'm yet to test it thoroughly. I wonder if anyone knows of an easier way of achieving the same. If so, I'm looking forward to suggestions. If not, I'll add that to the patterns wiki page too, in case someone else finds it useful. builder .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic") .groupByKey(/*key serde*/, /*transaction serde*/) .aggregate( () -> /*empty aggregate*/, aggregator(), merger(), SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2), /* aggregate serde */, txPerCustomerSumStore()// this store can be queried for per customer session data ) .toStream() .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session, so ignore them // the below map/groupByKey/reduce operations are to only propagate updates to the _latest_ session per customer to downstream .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that we can group by customerId only and reduce to the later value new KeyValue<>( windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors: timestamp() and aggs() windowedCustomerId.window().end(), agg ) ) ) .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg, TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS), "latest-session-windowed" ) .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum granularity, which is per-partition new KeyValue<>( new Windowed<>( windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// KIP-159 would come in handy here, to access partition number instead windowedCustomerId.window()// will use this in the interactive queries to pick the oldest not-yet-expired window ), timeAndAggs.aggs() ), new SessionKeySerde<>(Serdes.Integer()), /* aggregate serde */ ) .reduce( (val, agg) -> agg.add(val), (val, agg) -> agg.subtract(val), txTotalsStore()// this store can be queried to get totals per partition for all active sessions ); builder.globalTable( new SessionKeySerde<>(Serdes.Integer()), /* aggregate serde */, changelogTopicForStore(TRANSACTION_TOTALS),"totals"); // this global table puts per partition totals on every node, so that they can be easily summed for global totals, picking the oldest not-yet-expired window TODO: put in StreamParitioners (with KTable.through variants added in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary. The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to first do summation with max parallelism and minimize t
Re: ticketing system Design
If your business flow involves human actions, personally I would look at a business process engine like the open source camunda. Even if you don't choose to use it in production, you can use it to prototype and evolve your design at the inception stage. There's a simple to run example that integrates with kafka here: https://github.com/flowing/flowing-retail And a tutorial that involves a human action in the flow here (but no kafka): https://docs.camunda.org/get-started/bpmn20/ (NB. My personal interest in camunda is for integrating it as a process manager/saga element in an event-sourced service at some point) Cheers, Michał On 21/06/17 03:25, Tarun Garg wrote: need some more input on this. Kafka is a queue it doesn't take any action. sender(producer) sends data to kafka and consumer pulls data from kafka queue. so there is no assignment of data to any consumer. if a process/human cann't take any action then kafka cann't help in this case. hope it answers. From: Abhimanyu Nagrath <abhimanyunagr...@gmail.com> Sent: Monday, June 19, 2017 8:01 PM To: users@kafka.apache.org Subject: Re: ticketing system Design Hi , Can anyone suggest me where I can get the answer for these type of questions? Regards, Abhimanyu On Thu, Jun 8, 2017 at 6:49 PM, Abhimanyu Nagrath < abhimanyunagr...@gmail.com> wrote: Hi , Is Apache Kafka along with storm can be used to design a ticketing system. By ticketing system, I mean that there are millions of tasks stored in Kafka queues and there are processes/humans to take some actions on the task. there are come constraints that same task should not be assigned to two processes/humans and if a task flows to a process/human and no action is performed it should be reassigned. I am not sure whether this can be solved using Kafka.Any help is appreciated Regards, Abhimanyu -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka Streams: Problems implementing rate limiting processing step
Thanks, maybe I miss-read it: I then tried implementing my own scheduling that periodically sends/clears out messages using the ProcessorContext provided to the aforementioned transform ste I understood it to say it inspects a state store, sends the messages that should be sent and removes them from the store. I might have read too much out of it though. Cheers, Michał On 20/06/17 17:59, Matthias J. Sax wrote: I didn't know you could write to state stores from outside a processor/transformer. You can't. And as far as I understand this thread, nobody said you can. Did I miss something? -Matthias On 6/20/17 1:02 AM, Michal Borowiecki wrote: I didn't know you could write to state stores from outside a processor/transformer. Interesting to hear that it is working although I'd be careful as KIP-67 warns it can introduce undefined behaviour: https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams Operations on state stores The focus is on querying state stores, not updating them. It is not clear what it would mean to update a state store from outside the stream processing framework. Such updates are likely to introduce undefined behavior to the framework. The way I'd approach it until KIP-138 is released is to still use punctuate() but to also use your own scheduling to send periodic "tick" messages into the input topic. These messages can be ignored by the Processor but will cause the stream time to advance reliably. Just need to ensure they are distributed uniformly to all partitions. I appreciate this is not a elegant workaround but this is what I've settled for in the interim. Cheers, Michal On 19/06/17 23:03, Steven Schlansker wrote: On Jun 19, 2017, at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote: I then tried implementing my own scheduling that periodically sends/clears out messages using the ProcessorContext provided to the aforementioned transform step. However, it seems that when I call forward() from my scheduler (i.e. not in a process()/punctuate() call), I get a NullPointerException at ProcessorContextImpl.java:81 (https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81). I assume that this is because currentNode() is null outside of process()/punctuate() calls. There may be more elegant or direct solutions, but if all else fails you could always consider producing to a topic rather than trying to forward directly, then you don't have to touch the relatively delicate Processor semantics. -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612 -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch
+1 On 19/06/17 21:31, Vahid S Hashemian wrote: Thanks everyone. Great discussion. Because these Read or Write actions are interpreted in conjunction with particular resources (Topic, Group, ...) it would also make more sense to me that for committing offsets the ACL should be (Group, Write). So, a consumer would be required to have (Topic, Read), (Group, Write) ACLs in order to function. --Vahid From: Colin McCabe <cmcc...@apache.org> To: users@kafka.apache.org Date: 06/19/2017 11:01 AM Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch Thanks for the explanation. I still think it would be better to have the mutation operations require write ACLs, though. It might not be 100% intuitive for novice users, but the current split between Describe and Read is not intuitive for either novice or experienced users. In any case, I am +1 on the incremental improvement discussed in KIP-163. cheers, Colin On Sat, Jun 17, 2017, at 11:11, Hans Jespersen wrote: Offset commit is something that is done in the act of consuming (or reading) Kafka messages. Yes technically it is a write to the Kafka consumer offset topic but it's much easier for administers to think of ACLs in terms of whether the user is allowed to write (Produce) or read (Consume) messages and not the lower level semantics that are that consuming is actually reading AND writing (albeit only to the offset topic). -hans On Jun 17, 2017, at 10:59 AM, Viktor Somogyi <viktor.somo...@cloudera.com> wrote: Hi Vahid, +1 for OffsetFetch from me too. I also wanted to ask the strangeness of the permissions, like why is OffsetCommit a Read operation instead of Write which would intuitively make more sense to me. Perhaps any expert could shed some light on this? :) Viktor On Tue, Jun 13, 2017 at 2:38 PM, Vahid S Hashemian < vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>> wrote: Hi Michal, Thanks a lot for your feedback. Your statement about Heartbeat is fair and makes sense. I'll update the KIP accordingly. --Vahid From: Michal Borowiecki <michal.borowie...@openbet.com> To:users@kafka.apache.org, Vahid S Hashemian < vahidhashem...@us.ibm.com>, d...@kafka.apache.org Date:06/13/2017 01:35 AM Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch -- Hi Vahid, +1 wrt OffsetFetch. The "Additional Food for Thought" mentions Heartbeat as a non-mutating action. I don't think that's true as the GroupCoordinator updates the latestHeartbeat field for the member and adds a new object to the heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration() called from handleHeartbeat() NB added dev mailing list back into CC as it seems to have been lost along the way. Cheers, Michał On 12/06/17 18:47, Vahid S Hashemian wrote: Hi Colin, Thanks for the feedback. To be honest, I'm not sure either why Read was selected instead of Write for mutating APIs in the initial design (I asked Ewen on the corresponding JIRA and he seemed unsure too). Perhaps someone who was involved in the design can clarify. Thanks. --Vahid From: Colin McCabe *<cmcc...@apache.org <mailto:cmcc...@apache.org * <cmcc...@apache.org <mailto:cmcc...@apache.org>> To: *users@kafka.apache.org <mailto:users@kafka.apache.org>* <users@kafka.apache.org <mailto:users@kafka.apache.org>> Date: 06/12/2017 10:11 AM Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch Hi Vahid, I think you make a valid point that the ACLs controlling group operations are not very intuitive. This is probably a dumb question, but why are we using Read for mutating APIs? Shouldn't that be Write? The distinction between Describe and Read makes a lot of sense for Topics. A group isn't really something that you "read" from in the same way as a topic, so it always felt kind of weird there. best, Colin On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote: Hi all, I'm resending my earlier note hoping it would spark some conversation this time around :) Thanks. --Vahid From: "Vahid S Hashemian" *<vahidhashem...@us.ibm.com < mailto:vahidhashem...@us.ibm.com>>* <vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>> To: dev *<d...@kafka.apache.org <mailto:d...@kafka.apache.org>>* <d...@kafka.apache.org <mailto:d...@kafka.apache.org>>, "Kafka User" *<users@kafka.apache.org <mailto:users@kafka.apache.org>>* <users@kafka.apache.org <mailto:users@kafka.apache.org>> Date: 05/30/2017 08:33 AM Subject:KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch Hi, I started a new KIP to improve the minimum required ACL permissions of some
Re: Kafka Streams: Problems implementing rate limiting processing step
I didn't know you could write to state stores from outside a processor/transformer. Interesting to hear that it is working although I'd be careful as KIP-67 warns it can introduce undefined behaviour: https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams Operations on state stores The focus is on querying state stores, not updating them. It is not clear what it would mean to update a state store from outside the stream processing framework. Such updates are likely to introduce undefined behavior to the framework. The way I'd approach it until KIP-138 is released is to still use punctuate() but to also use your own scheduling to send periodic "tick" messages into the input topic. These messages can be ignored by the Processor but will cause the stream time to advance reliably. Just need to ensure they are distributed uniformly to all partitions. I appreciate this is not a elegant workaround but this is what I've settled for in the interim. Cheers, Michal On 19/06/17 23:03, Steven Schlansker wrote: On Jun 19, 2017, at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote: I then tried implementing my own scheduling that periodically sends/clears out messages using the ProcessorContext provided to the aforementioned transform step. However, it seems that when I call forward() from my scheduler (i.e. not in a process()/punctuate() call), I get a NullPointerException at ProcessorContextImpl.java:81 (https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81). I assume that this is because currentNode() is null outside of process()/punctuate() calls. There may be more elegant or direct solutions, but if all else fails you could always consider producing to a topic rather than trying to forward directly, then you don't have to touch the relatively delicate Processor semantics. -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka 0.11 transactions API question
I'm not sure if I understood correctly, but if you want to integrate a single kafka producer transaction (or any transaction manager that only supports local transaction) into a distributed transaction, I think you can do so as long as all other involved transaction managers support 2-phase commit. In other words, you can include one (and only one) local-only transaction into a distributed transaction. The steps the distributed transaction coordinator would have to take to commit the transaction would be: 1. call prepare on each of the transaction participants that support 2-phase commit 2. if any of them fails, abort all transactions, otherwise, proceed 3. call commit on the one transaction participant that does not support 2-phase commit (kafka producer in this case) 4. if that fails, abort all transactions, otherwise, proceed 5. call commit on all the transaction participants that support 2-phase commit (since prepare on these succeeded they should not refuse to commit at this point) So as to your concern about getting "clearance" (I take it as the equivalent of the "prepare" call) from the kafka producer, you don't really need it IMO, as if commit fails on the kafka producer, you can still abort the remaining transactions. Of course you can't do that if you have more than one transaction that doesn't support 2-phase commit in play. Having said that, the advice these days seems to be to design distributed systems for eventual consistency, as using distributed transactions, while tempting, often leads to resource exhaustion as transaction managers have to go the extra mile to ensure they can commit any transaction that had prepare return successfully. Just my 5c. I may be wrong in any of the above, please point it out if so. Cheers, Michał On 19/06/17 14:57, Piotr Nowojski wrote: Sorry for responding to my own message, but when I sent an original message/question I was not subscribed to this mailing list and now I can not respond to Matthias answer directly. I don't want to share a transaction between multiple Producers threads/processes, I just would like to resume an interrupted transaction after a machine crash. Let me try to phrase the problem differently: From the perspective of a producer that writes to Kafka, we have the following situation: We integrate the producer with transaction in another system. A number or records should go together atomically (a transaction). Before committing the transaction, we frequently need to ask for a "clearance" status, and if we get the "go ahead" we want to commit the transaction. Unfortunately, as soon as we get that "clearance", we cannot reproduce the records any more (the are dropped from the original data stream). If something fails between the "go ahead" and the committing, we need to retry the transaction, so we need to come up again with all records. As a result we have to persist the records before we start the write transaction. That is a painful overhead to pay, and a lot of additional operational complexity. The simplest way to support that pattern without extra overhead would we could "resume" a transaction: - Each transaction as a unique Transaction ID - If a crash of the producer occurs, the transaction is NOT aborted automatically. - Instead, the restarted producer process reconnects to the transaction and decides to commit it or abort it. - The transaction timeout aborts the transaction after a while if inactivity. Maybe this could be easily supported? Thanks, Piotrek 2017-06-16 17:59 GMT+02:00 Piotr Nowojski <piotr.nowoj...@gmail.com <mailto:piotr.nowoj...@gmail.com>>: But isn't it a low hanging fruit at this moment? Isn't that just an API limitation and wouldn't the backend for transactions support it with only minor changes to the API (do not fail automatically dangling transactions on Producer restart)? Flushing is already there so that _should_ handle the pre-commit. Again, maybe I'm missing something and for sure I am not familiar with Kafka's internals. Piotrek 2017-06-16 15:47 GMT+02:00 Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>>: I don't think KIP-98 is as ambitious as to provide support for distributed transactions (2 phase commit). It would be great if I was wrong though :P Cheers, Michał On 16/06/17 14:21, Piotr Nowojski wrote: Hi, I'm looking into Kafka's transactions API as proposed in KIP-98. I've read both this KIP-98 document and I looked into the code that is on the master branch. I would like to use it to implement some two phase commit mechanism on top of the Kafka's transactions, that would allow me to tie multiple systems (some of them might not be
Re: Kafka Streams vs Spark Streaming : reduce by window
If confusion is the problem, then totally agree no point adding more knobs. Perhaps you're right that users don't /really/ want processing-time semantics. Just /think/ they want them until they start considering replay/catch-up scenarios. I guess people rarely think about those from the start (I sure didn't). Cheers, Michał On 16/06/17 17:54, Jay Kreps wrote: I think the question is when do you actually /want/ processing time semantics? There are definitely times when its safe to assume the two are close enough that a little lossiness doesn't matter much but it is pretty hard to make assumptions about when the processing time is and has been hard for us to think of a use case where its actually desirable. I think mostly what we've seen is confusion about the core concepts: * stream -- immutable events that occur * tables (including windows) -- current state of the world If the root problem is confusion adding knobs never makes it better. If the root problem is we're missing important use cases that justify the additional knobs then i think it's good to try to really understand them. I think there could be use cases around systems that don't take updates, example would be email, twitter, and some metrics stores. One solution that would be less complexity inducing than allowing new semantics, but might help with the use cases we need to collect, would be to add a new operator in the DSL. Something like .freezeAfter(30, TimeUnit.SECONDS) that collects all updates for a given window and both emits and enforces a single output after 30 seconds after the advancement of stream time and remembers that it is omitted, suppressing all further output (so the output is actually a KStream). This might or might not depend on wall clock time. Perhaps this is in fact what you are proposing? -Jay On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> wrote: I wonder if it's a frequent enough use case that Kafka Streams should consider providing this out of the box - this was asked for multiple times, right? Personally, I agree totally with the philosophy of "no final aggregation", as expressed by Eno's post, but IMO that is predicated totally on event-time semantics. If users want processing-time semantics then, as the docs already point out, there is no such thing as a late-arriving record - every record just falls in the currently open window(s), hence the notion of final aggregation makes perfect sense, from the usability point of view. The single abstraction of "stream time" proves leaky in some cases (e.g. for punctuate method - being addressed in KIP-138). Perhaps this is another case where processing-time semantics warrant explicit handling in the api - but of course, only if there's sufficient user demand for this. What I could imagine is a new type of time window (ProcessingTimeWindow?), that if used in an aggregation, the underlying processor would force the WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the system-time punctuation (KIP-138) to send the final aggregation value once the window has expired and could be configured to not send intermediate updates while the window was open. Of course this is just a helper for the users, since they can implement it all themselves using the low-level API, as Matthias pointed out already. Just seems there's recurring interest in this. Again, this only makes sense for processing time semantics. For event-time semantics I find the arguments for "no final aggregation" totally convincing. Cheers, Michał On 16/06/17 00:08, Matthias J. Sax wrote: Hi Paolo, This SO question might help, too: https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable <https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable> For Streams, the basic model is based on "change" and we report updates to the "current" result immediately reducing latency to a minimum. Last, if you say it's going to fall into the next window, you won't get event time semantics but you fall back processing time semantics, that cannot provide exact results If you really want to trade-off correctness version getting (late) updates and want to use processing time semantics, you should configure WallclockTimestampExtractor and implement a "update deduplication" operator using table.toStream().transform(). You can attached a state to your transformer and store all update there (ie, newer update overwrite older updates). Punctuations allow you to emit "final" results for wi
Re: Kafka 0.11 transactions API question
I don't think KIP-98 is as ambitious as to provide support for distributed transactions (2 phase commit). It would be great if I was wrong though :P Cheers, Michał On 16/06/17 14:21, Piotr Nowojski wrote: Hi, I'm looking into Kafka's transactions API as proposed in KIP-98. I've read both this KIP-98 document and I looked into the code that is on the master branch. I would like to use it to implement some two phase commit mechanism on top of the Kafka's transactions, that would allow me to tie multiple systems (some of them might not be Kafka) in one transaction. Maybe I'm missing something but the problem is I don't see a way to implement it using proposed Kafka's transactions API. Even if I have just two processes writing to Kafka topics, I don't know how can I guarantee that if one's transaction is committed, the other will also eventually be committed. This is because if first KafkaProducer successfully commits, but the second one fails before committing it's data, after restart the second one's "initTransactions" call will (according to my understanding of the API) abort previously non completed transactions. Usually transactional systems expose API like this <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>. Namely there is a known identifier for a transaction and you can pre-commit it (void prepare(...) method in before mentioned example) and then commit or you can abort this transaction. Usually pre-commit involves flushing stuff to some temporary files and commit move those files to the final directory. In case of machine/process failure, if it was before "pre-commit", we can just rollback all transactions from all of the processes. However once every process acknowledge that it completed "pre-commit", each process should call "commit". If some process fails at that stage, after restarting this process, I would expect to be able to restore it's "pre-committed" transaction (having remembered transaction's id) and re attempt to commit it - which should be guaranteed to eventually succeed. In other words, it seems to me like the missing features of this API for me are: 1. possibility to resume transactions after machine/process crash. At least I would expect to be able to commit "flushed"/"pre-committed" data for such transactions. 2. making sure that committing already committed transactions doesn't brake anything Or maybe there is some other way to integrate Kafka into such two phase commit system that I'm missing? Thanks, Piotrek -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka Streams vs Spark Streaming : reduce by window
ploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark. Thanks, Paolo. Paolo Patierno Senior Software Engineer (IoT) @ Red Hat Microsoft MVP on Windows Embedded & IoT Microsoft Azure Advisor Twitter : @ppatierno<http://twitter.com/ppatierno> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> Blog : DevExperience<http://paolopatierno.wordpress.com/> From: Eno Thereska <eno.there...@gmail.com> Sent: Thursday, June 15, 2017 1:45 PM To: users@kafka.apache.org Subject: Re: Kafka Streams vs Spark Streaming : reduce by window Hi Paolo, That is indeed correct. We don’t believe in closing windows in Kafka Streams. You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>. Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> Thanks Eno On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote: Hi, using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark. Imagine following scenario ... I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds. This is what happens with reduceByWindow method in Spark. I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value. For example ... An application sends numeric values every 1 second. With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26. With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26. Is it possible with Kafka Streams ? Or it's something to do at application level ? Thanks, Paolo Paolo Patierno Senior Software Engineer (IoT) @ Red Hat Microsoft MVP on Windows Embedded & IoT Microsoft Azure Advisor Twitter : @ppatierno<http://twitter.com/ppatierno> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> Blog : DevExperience<http://paolopatierno.wordpress.com/> -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch
Hi Vahid, +1 wrt OffsetFetch. The "Additional Food for Thought" mentions Heartbeat as a non-mutating action. I don't think that's true as the GroupCoordinator updates the latestHeartbeat field for the member and adds a new object to the heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration() called from handleHeartbeat() NB added dev mailing list back into CC as it seems to have been lost along the way. Cheers, Michał On 12/06/17 18:47, Vahid S Hashemian wrote: Hi Colin, Thanks for the feedback. To be honest, I'm not sure either why Read was selected instead of Write for mutating APIs in the initial design (I asked Ewen on the corresponding JIRA and he seemed unsure too). Perhaps someone who was involved in the design can clarify. Thanks. --Vahid From: Colin McCabe <cmcc...@apache.org> To: users@kafka.apache.org Date: 06/12/2017 10:11 AM Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch Hi Vahid, I think you make a valid point that the ACLs controlling group operations are not very intuitive. This is probably a dumb question, but why are we using Read for mutating APIs? Shouldn't that be Write? The distinction between Describe and Read makes a lot of sense for Topics. A group isn't really something that you "read" from in the same way as a topic, so it always felt kind of weird there. best, Colin On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote: Hi all, I'm resending my earlier note hoping it would spark some conversation this time around :) Thanks. --Vahid From: "Vahid S Hashemian" <vahidhashem...@us.ibm.com> To: dev <d...@kafka.apache.org>, "Kafka User" <users@kafka.apache.org> Date: 05/30/2017 08:33 AM Subject:KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch Hi, I started a new KIP to improve the minimum required ACL permissions of some of the APIs: https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch The KIP is to address KAFKA-4585. Feedback and suggestions are welcome! Thanks. --Vahid -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Re: synchronous request response using kafka
cc-ed users mailing list, as I think it's more appropriate for this thread. Sanjay, if you what you're after is the following pattern: http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html then yes, you can do this in kafka. The outline would be similar to the JMS example above, but specifics of course different. Also, currently you would have to put the reply topic name and correlation id into the msg value itself but from v0.11 you can use custom headers for that. Hope that helps, Michał On 12/05/17 22:02, Colin McCabe wrote: Hi Sanjay, Can you be a little clearer what you are trying to achieve? If you want to build an RPC system where one entity makes a remote procedure call to another, you might consider using something like CORBA, Apache Thrift, gRPC, etc. best, Colin On Fri, May 12, 2017, at 07:55, Banerjee, Sanjay wrote: Can someone please share some thoughts whether we can do synchronous call (request response) using kafka similar to JMS Thanks Sanjay 913-221-9164 -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Zookeeper on same server as Kafka
We are indeed running this setup in production, have been for almost 2 years now, over a gradually increasing number of deployments. Let me clarify though: Our clusters don't exceed 5 nodes. We're not exercising Kafka nowhere near its limits, or bandwidth or disk I/O for that matter. When we reach the point we want to scale Kafka independently from ZK, we will. The current setup is not set in stone. But for now everything is running under modest load, so don't see the point of forgoing the simplicity of this until we actually have a need to scale further or put greater load on it. YMMV, not everyone is running at the same scale as Heroku ;-) Thanks, Michal On 04/06/17 11:34, Tom Crayford wrote: Hi, I would not recommend running this kind of set up in production. Busy Kafka brokers use up a lot of disk and network bandwidth, which zookeeper does not deal well with. This means that a burst of traffic to 1 node carries the risk of disrupting the ZK ensemble. Secondly, this will cause problems down the line, because you will want to scale Kafka independently from ZK. ZK gets slower as you add nodes, but Kafka can scale out for quite a while. For production clusters, I'd recommend to always have 5 ZK nodes, but for Kafka, you can scale well past that, or keep it small while you are starting out. Thanks, Tom Crayford Heroku Kafka On Sat, Jun 3, 2017 at 8:20 AM, Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> wrote: I'm not an expert but I prefer keeping zookeepers on the same hosts as kafka brokers and mimic each-others topology. The reason is to minimize the chance of e.g. kafka brokers being able to talk to one another but zookeepers not, or vice-versa. So, I'd say I /do/ want my kafka broker and the co-located zookeeper to go down together - for simplicity - prefer that to some asymmetric failures to debug. This comes from past experience, albeit using other technologies, when relying on 2 different clustering mechanism made failures in one but not the other very difficult to debug. Also, I think I read this advice somewhere a long time ago (don't recall where) and it made sense to me (given the prior experience) and we've never tried a different arrangement. As to the overheads, I believe it's mostly disk IO and can hopefully be addressed by separate disks for each but it's never been a bottleneck for us, so can't really say. Thanks, Michał On 02/06/17 21:47, Mohammed Manna wrote: Usually, the overhead comes when you have kafka and zookeeper doing the housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you should keep their logs on totally different physical machine for better performance. Furthermore, if a mechanical failure occurs, you might not want both zookeeper and broker going down together. Can anyone else chime in for some better points? On 2 Jun 2017 7:57 pm, "Meghana Narasimhan"<mnarasim...@bandwidth.com> <mailto:mnarasim...@bandwidth.com> wrote: Hi, What are the pros and cons of setting up Zookeeper on the same server as the Kafka broker ? Earlier offsets were being written to zookeeper which was a major overhead but with offsets being written to Kafka now, what other requirements should be taken into consideration for setting up Zookeeper on the same server as Kafka vs having a separate zookeeper cluster ? Thanks, Meghana -- <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 <tel:+44%2020%208742%201600> +44 203 249 8448 <tel:+44%2020%203249%208448> E: michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com> W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612 -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T:
Re: Zookeeper on same server as Kafka
I'm not an expert but I prefer keeping zookeepers on the same hosts as kafka brokers and mimic each-others topology. The reason is to minimize the chance of e.g. kafka brokers being able to talk to one another but zookeepers not, or vice-versa. So, I'd say I /do/ want my kafka broker and the co-located zookeeper to go down together - for simplicity - prefer that to some asymmetric failures to debug. This comes from past experience, albeit using other technologies, when relying on 2 different clustering mechanism made failures in one but not the other very difficult to debug. Also, I think I read this advice somewhere a long time ago (don't recall where) and it made sense to me (given the prior experience) and we've never tried a different arrangement. As to the overheads, I believe it's mostly disk IO and can hopefully be addressed by separate disks for each but it's never been a bottleneck for us, so can't really say. Thanks, Michał On 02/06/17 21:47, Mohammed Manna wrote: Usually, the overhead comes when you have kafka and zookeeper doing the housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you should keep their logs on totally different physical machine for better performance. Furthermore, if a mechanical failure occurs, you might not want both zookeeper and broker going down together. Can anyone else chime in for some better points? On 2 Jun 2017 7:57 pm, "Meghana Narasimhan" <mnarasim...@bandwidth.com> wrote: Hi, What are the pros and cons of setting up Zookeeper on the same server as the Kafka broker ? Earlier offsets were being written to zookeeper which was a major overhead but with offsets being written to Kafka now, what other requirements should be taken into consideration for setting up Zookeeper on the same server as Kafka vs having a separate zookeeper cluster ? Thanks, Meghana -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Kafka Streams Usage Patterns
Hi all, I've updated the wiki page with a draft pattern for consecutively growing time-windowed aggregations which was discussed some time ago on this mailing list. I'm yet to add the part that cleans up the stores using punctuations. Stay tuned. On a somewhat similar subject, I've been working to implement the following requirements: * transaction sums per customer session (simple, just extract non-expired session-windowed aggregates from a SessionStore using interactive queries) * global transaction sums for all _/currently active/_ customer sessions The second bit proved non-trivial, because session-windowed KTables (or any windowed KTables for that matter) don't notify downstream when a window expires. And I can't use punctuate until KIP-138 is implemented because stream time punctuation is no good in this case (records can stop coming), reliable system time punctuation would be needed. Below is how I implemented this, I'm yet to test it thoroughly. I wonder if anyone knows of an easier way of achieving the same. If so, I'm looking forward to suggestions. If not, I'll add that to the patterns wiki page too, in case someone else finds it useful. builder .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic") .groupByKey(/*key serde*/, /*transaction serde*/) .aggregate( () -> /*empty aggregate*/, aggregator(), merger(), SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2), /* aggregate serde */, txPerCustomerSumStore()// this store can be queried for per customer session data ) .toStream() .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session, so ignore them // the below map/groupByKey/reduce operations are to only propagate updates to the _latest_ session per customer to downstream .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that we can group by customerId only and reduce to the later value new KeyValue<>( windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors: timestamp() and aggs() windowedCustomerId.window().end(), agg ) ) ) .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg, TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS), "latest-session-windowed" ) .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum granularity, which is per-partition new KeyValue<>( new Windowed<>( windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// KIP-159 would come in handy here, to access partition number instead windowedCustomerId.window()// will use this in the interactive queries to pick the oldest not-yet-expired window ), timeAndAggs.aggs() ), new SessionKeySerde<>(Serdes.Integer()), /* aggregate serde */ ) .reduce( (val, agg) -> agg.add(val), (val, agg) -> agg.subtract(val), txTotalsStore()// this store can be queried to get totals per partition for all active sessions ); builder.globalTable( new SessionKeySerde<>(Serdes.Integer()), /* aggregate serde */, changelogTopicForStore(TRANSACTION_TOTALS),"totals"); // this global table puts per partition totals on every node, so that they can be easily summed for global totals, picking the oldest not-yet-expired window TODO: put in StreamParitioners (with KTable.through variants added in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary. The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to first do summation with max parallelism and minimize the work needed downstream. So I calculate a per-partition sum first to limit the updates that the totals topic will receive and the summing work done by the interactive queries on the global store. Is this a good way of going about it? Thanks, Michał On 09/05/17 18:31, Matthias J. Sax wrote: Hi, I started a new Wiki page to collect some common usage patterns for Kafka Streams. Right now, it contains a quick example on "how to compute average". Hope we can collect more example like this! https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns -Matthias -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick
Re: Partition assignment with multiple topics
Hi Mike, Are you performing any operations (e.g. joins) across all topics? If so, I'd think increasing the number of partitions is indeed the way to go. Partition is the unit of parallelism per topic and all topics are bound together in your app in that case. If not, your other option is to break up your application into a number of KafkaStreams instances, each dealing with a subset of topics. Hope that helps. Michał On 23/05/17 08:47, Mike Gould wrote: Hi We have a couple of hundred topics - each carrying a similar but distinct message type but to keep the total partition count down each only has 3 partitions. If I start Kafka-streams consuming all topics only 3 threads ever get assigned any partitions. I think the first thread to start gets the first partition of each topic, and so on until the 3rd thread, after that all the partitions are assigned - any further threads are just left idle. Is there any way to make the partition assignment smarter and either add a random element that moves partitions when further consumers start or considers all the partitions of subscribed topics together when assigning them? Our only alternative is creating many more partitions for each topic - and we're worried about how far this will scale. Thank you Mike G -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Order of punctuate() and process() in a stream processor
Hi Sini, This is beyond the score of KIP-138 but https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such improvements Thanks, Michal On 17 May 2017 5:10 p.m., Peter Sinoros Szabowrote: Hi, I see, now its clear why the repeated punctuations use the same time value in that case. Do you have a JIRA ticket to track improvement ideas for that? It would be great to have an option to: - advance the stream time before calling the process() on a new record - this would prevent to process a message in the wrong punctuation "segment". - use fine grained advance of stream time for the "missed" punctuations - this would ease the processing of burst messages after some silence. I do not see if KIP-138 may solve this or not. Regards -Sini From: "Matthias J. Sax" To: users@kafka.apache.org Date: 2017/05/12 19:19 Subject:Re: Order of punctuate() and process() in a stream processor Thanks for sharing. As punctuate is called with "streams time" you see the same time value multiple times. It's again due to the coarse grained advance of "stream time". @Thomas: I think, the way we handle it just simplifies the implementation of punctuations. I don't see any other "advantage". I will create a JIRA to track this -- we are currently working on some improvements of punctuation and time management already, and it seems to be another valuable improvement. -Matthias On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote: > Well, this is also a good question, because it is triggered with the same > timestamp 3 times, so in order to create my update for both three seconds, > I will have to count the number of punctuations and calculate the missed > stream times for myself. It's ok for me to trigger it 3 times, but the > timestamp should not be the same in each, but should be increased by the > schedule time in each punctuate. > > - Sini > > > > From: Thomas Becker > To: "users@kafka.apache.org" > Date: 2017/05/12 18:57 > Subject:RE: Order of punctuate() and process() in a stream > processor > > > > I'm a bit troubled by the fact that it fires 3 times despite the stream > time being advanced all at once; is there a scenario when this is > beneficial? > > > From: Matthias J. Sax [matth...@confluent.io] > Sent: Friday, May 12, 2017 12:38 PM > To: users@kafka.apache.org > Subject: Re: Order of punctuate() and process() in a stream processor > > Hi Peter, > > It's by design. Streams internally tracks time progress (so-called > "streams time"). "streams time" get advanced *after* processing a record. > > Thus, in your case, "stream time" is still at its old value before it > processed the first message of you send "burst". After that, "streams > time" is advanced by 3 seconds, and thus, punctuate fires 3 time. > > I guess, we could change the design and include scheduled punctuations > when advancing "streams time". But atm, we just don't do this. > > Does this make sense? > > Is this critical for your use case? Or do you just want to understand > what's happening? > > > -Matthias > > > On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote: >> Hi, >> >> >> Let's assume the following case. >> - a stream processor that uses the Processor API >> - context.schedule(1000) is called in the init() >> - the processor reads only one topic that has one partition >> - using custom timestamp extractor, but that timestamp is just a wall >> clock time >> >> >> Image the following events: >> 1., for 10 seconds I send in 5 messages / second >> 2., does not send any messages for 3 seconds >> 3., starts the 5 messages / second again >> >> I see that punctuate() is not called during the 3 seconds when I do not >> send any messages. This is ok according to the documentation, because >> there is not any new messages to trigger the punctuate() call. When the >> first few messages arrives after a restart the sending (point 3. above) > I >> see the following sequence of method calls: >> >> 1., process() on the 1st message >> 2., punctuate() is called 3 times >> 3., process() on the 2nd message >> 4., process() on each following message >> >> What I would expect instead is that punctuate() is called first and then >> process() is called on the messages, because the first message's > timestamp >> is already 3 seconds older then the last punctuate() was called, so the >> first message belongs after the 3 punctuate() calls. >> >> Please let me know if this is a bug or intentional, in this case what is >> the reason for processing one message before punctuate() is called? >> >> >> Thanks, >> Peter >> >> Péter Sinóros-Szabó >> Software Engineer >> >> Ustream, an IBM Company >> Andrassy ut 39, H-1061 Budapest >> Mobile: +36203693050 >> Email: peter.sinoros-sz...@hu.ibm.com >> > > > > This email
Re: How to chain increasing window operations one after another
Just had a thought: If you implement the Windowed/Tuple serde to store the timestamp(s) before the actual record key then you can simply periodically do a ranged query on each of the state stores to find and delete all data older than ... (using punctuate() inside a Processor). Any downsides to that? Cheers, Michał On 09/05/17 09:17, Michal Borowiecki wrote: Hi Matthias, Yes, the ever growing stores were my concern too. That was the intention behind my TODO note in the first reply just didn't want to touch on this until I've dug deeper into it. I understand compaction+retention policy on the backing changelog topics takes care of cleaning up on the broker-side but Rocks dbs will grow indefinitely, right? (until re-balanced?) Punctuation was the first idea that came to my mind too when originally faced this problem on my project. However, as you said it's only on KStream and aggregations on KStream actually discard tombstones and don't forward them on to the KTable: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L798-L799 * Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. I haven't come up with a satisfactory solution yet, but it's still on my mind. TTLs on stores could potentially solve this issue and just today they were asked about on SO: http://stackoverflow.com/questions/43860114/kafka-streams-low-level-processor-api-rocksdb-timetolivettl/43862922#43862922 Garrett, was that you? :-) Thanks, Michał On 08/05/17 23:29, Matthias J. Sax wrote: Thinking about this once more (and also having a fresh memory of another thread about KTables), I am wondering if this approach needs some extra tuning: As the result of the first window aggregation produces an output stream with unbounded key space, the following (non-windowed) KTables would grow indefinitely, if I don't miss anything. Thus, it might be required to put a transform() that only forwards all data 1-to-1, but additionally registers a punctuation schedule. When punctuation is called, it would be required to send tombstone messages downstream (or a simliar) that deletes windows that are older than the retention time. Sound tricky to implement though... `transform()` would need to keep track of used keys to send appropriate tombstones in an custom state. Also. `transform` is only available for KStream and transforming (windowed) KTable into KStream into KTable while preserving the required semantics seems not to be straight forwards. Any thoughts about this potential issue? -Matthias On 5/8/17 3:05 PM, Garrett Barton wrote: Michael, This is slick! I am still writing unit tests to verify it. My code looks something like: KTable<Windowed, CountSumMinMaxAvgObj> oneMinuteWindowed = srcStream// my val object isnt really called that, just wanted to show a sample set of calculations the value can do! .groupByKey(Serdes.String(), Serdes.Double()) .aggregate(/*initializer */, /* aggregator */, TimeWindows.of(60*1000, 60*1000), "store1m"); // i used an aggregate here so I could have a non-primitive value object that does the calculations on each aggregator, pojo has an .add(Double) in it. KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed = oneMinuteWindowed// I made my own Tuple2, will move window calc into it .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String, Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - val for primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! // again my val object has an .add(Obj) and a .sub() to handle this, so nice! KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 60 minutes .reduce(/*your adder*/, /*your subtractor*/, "
Re: How to chain increasing window operations one after another
. Serdes gets nuts as well as the Generic typing on some of these classes (yea you KeyValueMapper), makes for long code! I had to specify them everywhere since the key/val's changed. I didn't get enough time to mess with it today, I will wrap up the unit tests and run it to see how it performs against my real data as well tomorrow. I expect a huge reduction in resources (both streams and kafka storage) by moving to this. Thank you! On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io> wrote: Michal, that's an interesting idea. In an ideal world, Kafka Streams should have an optimizer that is able to to this automatically under the hood. Too bad we are not there yet. @Garret: did you try this out? This seems to be a question that might affect many users, and it might we worth to document it somewhere as a recommended pattern. -Matthias On 5/8/17 1:43 AM, Michal Borowiecki wrote: Apologies, In the code snippet of course only oneMinuteWindowed KTable will have a Windowed key (KTable<Windowed, Value>), all others would be just KTable<Tuple2<Key, Long>, Value>. Michał On 07/05/17 16:09, Michal Borowiecki wrote: Hi Garrett, I've encountered a similar challenge in a project I'm working on (it's still work in progress, so please take my suggestions with a grain of salt). Yes, I believe KTable.groupBy lets you accomplish what you are aiming for with something like the following (same snippet attached as txt file): KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and Value stand for your actual key and value types .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed// Tuple2 for this example as defined by javaslang library .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - valfor primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m"); So, step by step: * You use a windowed aggregation only once, from there on you use the KTable abstraction only (which doesn't have windowed aggregations). * In each subsequent groupBy you map the key to a pair of (your-real-key, timestamp) where the timestamp is rounded down with the precision of the size of the new window. * reduce() on a KGroupedTable takes an adder and a subtractor and it will correctly update the new aggregate by first subtracting the previous value of the upstream record before adding the new value (this way, just as you said, the downstream is aware of the statefulness of the upstream and correctly treats each record as an update) * If you want to reduce message volume further, you can break these into separate KafkaStreams instances and configure downstream ones with a higher commit.interval.ms (unfortunately you can't have different values of this setting in different places of the same topology I'm afraid) * TODO: Look into retention policies, I haven't investigated that in any detail. I haven't tested this exact code, so please excuse any typos. Also, if someone with more experience could chip in and check if I'm not talking nonsense here, or if there's an easier way to this, that would be great. I don't know if the alternative approach is possible, where you convert each resulting KTable back into a stream and just do a windowed aggregation somehow. That would feel more natural, but I haven't figured out how to correctly window over a changelog in the KStream abstraction, feels impossible in the high
Re: How to chain increasing window operations one after another
ts as well as the Generic typing on some of these classes (yea you KeyValueMapper), makes for long code! I had to specify them everywhere since the key/val's changed. I didn't get enough time to mess with it today, I will wrap up the unit tests and run it to see how it performs against my real data as well tomorrow. I expect a huge reduction in resources (both streams and kafka storage) by moving to this. Thank you! On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io> wrote: Michal, that's an interesting idea. In an ideal world, Kafka Streams should have an optimizer that is able to to this automatically under the hood. Too bad we are not there yet. @Garret: did you try this out? This seems to be a question that might affect many users, and it might we worth to document it somewhere as a recommended pattern. -Matthias On 5/8/17 1:43 AM, Michal Borowiecki wrote: Apologies, In the code snippet of course only oneMinuteWindowed KTable will have a Windowed key (KTable<Windowed, Value>), all others would be just KTable<Tuple2<Key, Long>, Value>. Michał On 07/05/17 16:09, Michal Borowiecki wrote: Hi Garrett, I've encountered a similar challenge in a project I'm working on (it's still work in progress, so please take my suggestions with a grain of salt). Yes, I believe KTable.groupBy lets you accomplish what you are aiming for with something like the following (same snippet attached as txt file): KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and Value stand for your actual key and value types .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed// Tuple2 for this example as defined by javaslang library .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - valfor primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m"); So, step by step: * You use a windowed aggregation only once, from there on you use the KTable abstraction only (which doesn't have windowed aggregations). * In each subsequent groupBy you map the key to a pair of (your-real-key, timestamp) where the timestamp is rounded down with the precision of the size of the new window. * reduce() on a KGroupedTable takes an adder and a subtractor and it will correctly update the new aggregate by first subtracting the previous value of the upstream record before adding the new value (this way, just as you said, the downstream is aware of the statefulness of the upstream and correctly treats each record as an update) * If you want to reduce message volume further, you can break these into separate KafkaStreams instances and configure downstream ones with a higher commit.interval.ms (unfortunately you can't have different values of this setting in different places of the same topology I'm afraid) * TODO: Look into retention policies, I haven't investigated that in any detail. I haven't tested this exact code, so please excuse any typos. Also, if someone with more experience could chip in and check if I'm not talking nonsense here, or if there's an easier way to this, that would be great. I don't know if the alternative approach is possible, where you convert each resulting KTable back into a stream and just do a windowed aggregation somehow. That would feel more natural, but I haven't figured out how to correctly window over a changelog in the KStream abstraction, feels impossible in the high-level DSL. H
Re: How to chain increasing window operations one after another
This seems to be a question that might affect many users, and it might we worth to document it somewhere as a recommended pattern. I was thinking the same thing :) How about a page on the wiki listing useful patterns with subpages for each patten in detail? (like for KIPs) Thanks, Michał On 08/05/17 22:26, Matthias J. Sax wrote: Michal, that's an interesting idea. In an ideal world, Kafka Streams should have an optimizer that is able to to this automatically under the hood. Too bad we are not there yet. @Garret: did you try this out? This seems to be a question that might affect many users, and it might we worth to document it somewhere as a recommended pattern. -Matthias On 5/8/17 1:43 AM, Michal Borowiecki wrote: Apologies, In the code snippet of course only oneMinuteWindowed KTable will have a Windowed key (KTable<Windowed, Value>), all others would be just KTable<Tuple2<Key, Long>, Value>. Michał On 07/05/17 16:09, Michal Borowiecki wrote: Hi Garrett, I've encountered a similar challenge in a project I'm working on (it's still work in progress, so please take my suggestions with a grain of salt). Yes, I believe KTable.groupBy lets you accomplish what you are aiming for with something like the following (same snippet attached as txt file): KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and Value stand for your actual key and value types .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed// Tuple2 for this example as defined by javaslang library .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - valfor primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m"); So, step by step: * You use a windowed aggregation only once, from there on you use the KTable abstraction only (which doesn't have windowed aggregations). * In each subsequent groupBy you map the key to a pair of (your-real-key, timestamp) where the timestamp is rounded down with the precision of the size of the new window. * reduce() on a KGroupedTable takes an adder and a subtractor and it will correctly update the new aggregate by first subtracting the previous value of the upstream record before adding the new value (this way, just as you said, the downstream is aware of the statefulness of the upstream and correctly treats each record as an update) * If you want to reduce message volume further, you can break these into separate KafkaStreams instances and configure downstream ones with a higher commit.interval.ms (unfortunately you can't have different values of this setting in different places of the same topology I'm afraid) * TODO: Look into retention policies, I haven't investigated that in any detail. I haven't tested this exact code, so please excuse any typos. Also, if someone with more experience could chip in and check if I'm not talking nonsense here, or if there's an easier way to this, that would be great. I don't know if the alternative approach is possible, where you convert each resulting KTable back into a stream and just do a windowed aggregation somehow. That would feel more natural, but I haven't figured out how to correctly window over a changelog in the KStream abstraction, feels impossible in the high-level DSL. Hope that helps, Michal On 02/05/17 18:03, Garrett Barton wrote: Lets say I want to sum values over increasing window sizes of 1,5,15,60 minutes. Right now I have the
Re: How to chain increasing window operations one after another
Apologies, In the code snippet of course only oneMinuteWindowed KTable will have a Windowed key (KTable<Windowed, Value>), all others would be just KTable<Tuple2<Key, Long>, Value>. Michał On 07/05/17 16:09, Michal Borowiecki wrote: Hi Garrett, I've encountered a similar challenge in a project I'm working on (it's still work in progress, so please take my suggestions with a grain of salt). Yes, I believe KTable.groupBy lets you accomplish what you are aiming for with something like the following (same snippet attached as txt file): KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and Value stand for your actual key and value types .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed// Tuple2 for this example as defined by javaslang library .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - valfor primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m"); So, step by step: * You use a windowed aggregation only once, from there on you use the KTable abstraction only (which doesn't have windowed aggregations). * In each subsequent groupBy you map the key to a pair of (your-real-key, timestamp) where the timestamp is rounded down with the precision of the size of the new window. * reduce() on a KGroupedTable takes an adder and a subtractor and it will correctly update the new aggregate by first subtracting the previous value of the upstream record before adding the new value (this way, just as you said, the downstream is aware of the statefulness of the upstream and correctly treats each record as an update) * If you want to reduce message volume further, you can break these into separate KafkaStreams instances and configure downstream ones with a higher commit.interval.ms (unfortunately you can't have different values of this setting in different places of the same topology I'm afraid) * TODO: Look into retention policies, I haven't investigated that in any detail. I haven't tested this exact code, so please excuse any typos. Also, if someone with more experience could chip in and check if I'm not talking nonsense here, or if there's an easier way to this, that would be great. I don't know if the alternative approach is possible, where you convert each resulting KTable back into a stream and just do a windowed aggregation somehow. That would feel more natural, but I haven't figured out how to correctly window over a changelog in the KStream abstraction, feels impossible in the high-level DSL. Hope that helps, Michal On 02/05/17 18:03, Garrett Barton wrote: Lets say I want to sum values over increasing window sizes of 1,5,15,60 minutes. Right now I have them running in parallel, meaning if I am producing 1k/sec records I am consuming 4k/sec to feed each calculation. In reality I am calculating far more than sum, and in this pattern I'm looking at something like (producing rate)*(calculations)*(windows) for a consumption rate. So I had the idea, could I feed the 1 minute window into the 5 minute, and 5 into 15, and 15 into 60. Theoretically I would consume a fraction of the records, not have to scale as huge and be back to something like (producing rate)*(calculations)+(updates). Thinking this is an awesome idea I went to try and implement it and got twisted around. These are windowed grouping operations that produce KTables, which means instead of a raw stre
Re: How to chain increasing window operations one after another
he latest values for say the 5 1 minute sum's in a given window, to perform the 5 minute sum. Reading the docs which are awesome, I cannot determine if the KTable.groupby() would work over a window, and would reduce or aggregate thus do what I need? Any ideas? -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612 KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and Value stand for your actual key and value types .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed// Tuple2 for this example as defined by javaslang library .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - val for primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2 /1000/60/15 *1000*60*15), value) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m");
Re: Does Kafka producer waits till previous batch returns responce before sending next one?
Yes, that's what the docs say in both places: max.in.flight.requests.per.connection The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled). retries Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting|max.in.flight.requests.per.connection|to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Cheers, Michał On 30/04/17 19:32, Jun MA wrote: Does this mean that if the client have retry > 0 and max.in.flight.requests.per.connection > 1, then even if the topic only have one partition, there’s still no guarantee of the ordering? Thanks, Jun On Apr 30, 2017, at 7:57 AM, Hans Jespersen <h...@confluent.io> wrote: There is a parameter that controls this behavior called max.in. flight.requests.per.connection If you set max.in. flight.requests.per.connection = 1 then the producer waits until previous produce requests returns a response before sending the next one (or retrying). The retries parameter controller the number of times to attempt to produce a batch after a failure. If flight.requests.per.connection = 1 and retries is get to the maximum then ordering is guaranteed. If there is a timeout then the producer library would try again and again to produce the message and will not skip over to try and produce the next message. If you set flight.requests.per.connection > 1 (I think the default is 5) then you can get a commit log with messages out of order wrt the original published order (because retries are done in parallel rather then in series) -hans On Apr 30, 2017, at 3:13 AM, Petr Novak <oss.mli...@gmail.com> wrote: Hello, Does Kafka producer waits till previous batch returns response before sending next one? Do I assume correctly that it does not when retries can change ordering? Hence batches delay is introduced only by producer internal send loop time and linger? If a timeout would be localized only to a single batch send request for some reason, does it affect the next batch (assuming this batch can go through successfully)? Many thanks, Petr -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Caching in Kafka Streams to ignore garbage message
Apologies, I must have not made myself clear. I meant the values in the records coming from the input topic (which in turn are coming from kafka connect in the example at hand) and not the records coming out of the join. My intention was to warn against sending null values from kafka connect to the topic that is then meant to be read-in as a ktable to filter against. Am I clearer now? Cheers, Michał On 30/04/17 18:14, Matthias J. Sax wrote: Your observation is correct. If you use inner KStream-KTable join, the join will implement the filter automatically as the join will not return any result. -Matthias On 4/30/17 7:23 AM, Michal Borowiecki wrote: I have something working on the same principle (except not using connect), that is, I put ids to filter on into a ktable and then (inner) join a kstream with that ktable. I don't believe the value can be null though. In a changlog null value is interpreted as a delete so won't be put into a ktable. The RocksDB store, for one, does this: private void putInternal(byte[] rawKey, byte[] rawValue) { if (rawValue == null) { try { db.delete(wOptions, rawKey); But any non-null value would do. Please correct me if miss-understood. Cheers, Michał On 27/04/17 22:44, Matthias J. Sax wrote: I'd like to avoid repeated trips to the db, and caching a large amount of data in memory. Lookups to the DB would be hard to get done anyway. Ie, it would not perform well, as all your calls would need to be synchronous... Is it possible to send a message w/ the id as the partition key to a topic, and then use the same id as the key, so the same node which will receive the data for an id is the one which will process it? That is what I did propose (maybe it was not clear). If you use Connect, you can just import the ID into Kafka and leave the value empty (ie, null). This reduced you cache data to a minimum. And the KStream-KTable join work as you described it :) -Matthias On 4/27/17 2:37 PM, Ali Akhtar wrote: I'd like to avoid repeated trips to the db, and caching a large amount of data in memory. Is it possible to send a message w/ the id as the partition key to a topic, and then use the same id as the key, so the same node which will receive the data for an id is the one which will process it? On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax <matth...@confluent.io> wrote: The recommended solution would be to use Kafka Connect to load you DB data into a Kafka topic. With Kafka Streams you read your db-topic as KTable and do a (inne) KStream-KTable join to lookup the IDs. -Matthias On 4/27/17 2:22 PM, Ali Akhtar wrote: I have a Kafka topic which will receive a large amount of data. This data has an 'id' field. I need to look up the id in an external db, see if we are tracking that id, and if yes, we process that message, if not, we ignore it. 99% of the data will be for ids which are not being tracked - 1% or so will be for ids which are tracked. My concern is, that there'd be a lot of round trips to the db made just to check the id, and if it'd be better to cache the ids being tracked somewhere, so other ids are ignored. I was considering sending a message to another (or the same topic) whenever a new id is added to the track list, and that id should then get processed on the node which will process the messages. Should I just cache all ids on all nodes (which may be a large amount), or is there a way to only cache the id on the same kafka streams node which will receive data for that id? -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612 -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.open
Re: Issue in Kafka running for few days
Ah, yes, you're right. I miss-read it. My bad. Apologies. Michal On 30/04/17 16:02, Svante Karlsson wrote: @michal My interpretation is that he's running 2 instances of zookeeper - not 6. (1 on the "4 broker machine" and one on the other) I'm not sure where that leaves you in zookeeper land - ie if you happen to have a timeout between the two zookeepers will you be out of service or will you have a split brain problem? None of the alternatives are good. That said - it should be visible in the logs. Anyway two zk is not a good config - stick to one or go to three. 2017-04-30 15:41 GMT+02:00 Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>>: Hi Jan, Correct. As I said before it's not common or recommended practice to run an even number, and I wouldn't recommend it myself. I hope it didn't sound as if I did. However, I don't see how this would cause the issue at hand unless at least 3 out of the 6 zookeepers died, but that could also have happened in a 5 node setup. In either case, changing the number of zookeepers is not a prerequisite to progress debugging this issue further. Cheers, Michal On 30/04/17 13:35, jan wrote: I looked this up yesterday when I read the grandparent, as my old company ran two and I needed to know. Your link is a bit ambiguous but it has a link to the zookeeper Getting Started guide which says this: " For replicated mode, a minimum of three servers are required, and it is strongly recommended that you have an odd number of servers. If you only have two servers, then you are in a situation where if one of them fails, there are not enough machines to form a majority quorum. Two servers is inherently less stable than a single server, because there are two single points of failure. " <https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html> <https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html> cheers jan On 30/04/2017, Michal Borowiecki<michal.borowie...@openbet.com> <mailto:michal.borowie...@openbet.com> wrote: Svante, I don't share your opinion. Having an even number of zookeepers is not a problem in itself, it simply means you don't get any better resilience than if you had one fewer instance. Yes, it's not common or recommended practice, but you are allowed to have an even number of zookeepers and it's most likely not related to the problem at hand and does NOT need to be addressed first. https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup <https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup> Abhit, I'm afraid the log snippet is not enough for me to help. Maybe someone else in the community with more experience can recognize the symptoms but in the meantime, if you haven't already done so, you may want to search for similar issues: https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22 <https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20%7E%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22> searching for text like "ZK expired; shut down all controller" or "No broker in ISR is alive for" or other interesting events form the log. Hope that helps, Michal On 26/04/17 21:40, Svante Karlsson wrote: You are not supposed to run an even number of zookeepers. Fix that first On Apr 26, 2017 20:59, "Abhit Kalsotra"<abhit...@gmail.com> <mailto:abhit...@gmail.com> wrote: Any pointers please Abhi On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra<abhit...@gmail.com> <mailto:abhit...@gmail.com> wrote: Hi * My kafka setup **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other Machine* **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker nodes machine)* ** 2 Topics with partition size = 50 and replication factor = 3* I am producing on an average of around 500 messages / sec with each message size close to 98 bytes... More or less the message rate stays constant throughout, but after running the setup for close to 2 weeks , my Kafka cluster broke and this happened twice in a month. Not able to understand what's the issue, Kafka gurus please do share your inputs... the controlle.log file at the time of Kafka broken looks like *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26 12:03:34,998] INFO [Controller 0]: Removed ArrayBuffer() from list of shut
Re: Controller connection failures
Hi Chuck, Are you running zookeepers in the same containers as Kafka brokers? Kafka brokers should be able to communicate with any of the zookeepers and, more importantly, zookeepers need to be able to talk to each-other. Therefore, the zookeeper port should be exposed too (2181 by default), otherwise you're not going to have a valid cluster. Docs on the controller are here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals Let me know if that helped. Cheers, Michał On 27/04/17 19:08, Chuck Musser wrote: I'm running into a problem with a 3 broker cluster where, intermittently, one of the broker's controller begins to report that it cannot connect to the other brokers and repeatedly logs the failure. Each broker is running in its own Docker container on separate machines. These Docker containers have exposed 9092, which I think is sufficient for operation, but not sure. The log message are these: [2017-04-27 17:16:28,985] WARN [Controller-3-to-broker-2-send-thread], Controller 3's connection to broker 64174aa85d04:9092 (id: 2 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to 64174aa85d04:9092 (id: 2 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2017-04-27 17:16:28,986] WARN [Controller-3-to-broker-1-send-thread], Controller 3's connection to broker d4b8943ad4b5:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to d4b8943ad4b5:9092 (id: 1 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) This is Kafka 2.12-0.10.2.0. I'm wondering: 1. How do we figure out the cause of the connect failures? 2. What's the controller anyway? 3. Are there some command-line diagnostic tools for inspecting the health of the system? Thanks for any help, Chuck -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Caching in Kafka Streams to ignore garbage message
I have something working on the same principle (except not using connect), that is, I put ids to filter on into a ktable and then (inner) join a kstream with that ktable. I don't believe the value can be null though. In a changlog null value is interpreted as a delete so won't be put into a ktable. The RocksDB store, for one, does this: private void putInternal(byte[] rawKey,byte[] rawValue) { if (rawValue ==null) { try { db.delete(wOptions, rawKey); But any non-null value would do. Please correct me if miss-understood. Cheers, Michał On 27/04/17 22:44, Matthias J. Sax wrote: I'd like to avoid repeated trips to the db, and caching a large amount of data in memory. Lookups to the DB would be hard to get done anyway. Ie, it would not perform well, as all your calls would need to be synchronous... Is it possible to send a message w/ the id as the partition key to a topic, and then use the same id as the key, so the same node which will receive the data for an id is the one which will process it? That is what I did propose (maybe it was not clear). If you use Connect, you can just import the ID into Kafka and leave the value empty (ie, null). This reduced you cache data to a minimum. And the KStream-KTable join work as you described it :) -Matthias On 4/27/17 2:37 PM, Ali Akhtar wrote: I'd like to avoid repeated trips to the db, and caching a large amount of data in memory. Is it possible to send a message w/ the id as the partition key to a topic, and then use the same id as the key, so the same node which will receive the data for an id is the one which will process it? On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax <matth...@confluent.io> wrote: The recommended solution would be to use Kafka Connect to load you DB data into a Kafka topic. With Kafka Streams you read your db-topic as KTable and do a (inne) KStream-KTable join to lookup the IDs. -Matthias On 4/27/17 2:22 PM, Ali Akhtar wrote: I have a Kafka topic which will receive a large amount of data. This data has an 'id' field. I need to look up the id in an external db, see if we are tracking that id, and if yes, we process that message, if not, we ignore it. 99% of the data will be for ids which are not being tracked - 1% or so will be for ids which are tracked. My concern is, that there'd be a lot of round trips to the db made just to check the id, and if it'd be better to cache the ids being tracked somewhere, so other ids are ignored. I was considering sending a message to another (or the same topic) whenever a new id is added to the track list, and that id should then get processed on the node which will process the messages. Should I just cache all ids on all nodes (which may be a large amount), or is there a way to only cache the id on the same kafka streams node which will receive data for that id? -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: Issue in Kafka running for few days
Hi Jan, Correct. As I said before it's not common or recommended practice to run an even number, and I wouldn't recommend it myself. I hope it didn't sound as if I did. However, I don't see how this would cause the issue at hand unless at least 3 out of the 6 zookeepers died, but that could also have happened in a 5 node setup. In either case, changing the number of zookeepers is not a prerequisite to progress debugging this issue further. Cheers, Michal On 30/04/17 13:35, jan wrote: I looked this up yesterday when I read the grandparent, as my old company ran two and I needed to know. Your link is a bit ambiguous but it has a link to the zookeeper Getting Started guide which says this: " For replicated mode, a minimum of three servers are required, and it is strongly recommended that you have an odd number of servers. If you only have two servers, then you are in a situation where if one of them fails, there are not enough machines to form a majority quorum. Two servers is inherently less stable than a single server, because there are two single points of failure. " <https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html> cheers jan On 30/04/2017, Michal Borowiecki <michal.borowie...@openbet.com> wrote: Svante, I don't share your opinion. Having an even number of zookeepers is not a problem in itself, it simply means you don't get any better resilience than if you had one fewer instance. Yes, it's not common or recommended practice, but you are allowed to have an even number of zookeepers and it's most likely not related to the problem at hand and does NOT need to be addressed first. https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup Abhit, I'm afraid the log snippet is not enough for me to help. Maybe someone else in the community with more experience can recognize the symptoms but in the meantime, if you haven't already done so, you may want to search for similar issues: https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22 searching for text like "ZK expired; shut down all controller" or "No broker in ISR is alive for" or other interesting events form the log. Hope that helps, Michal On 26/04/17 21:40, Svante Karlsson wrote: You are not supposed to run an even number of zookeepers. Fix that first On Apr 26, 2017 20:59, "Abhit Kalsotra" <abhit...@gmail.com> wrote: Any pointers please Abhi On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra <abhit...@gmail.com> wrote: Hi * My kafka setup **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other Machine* **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker nodes machine)* ** 2 Topics with partition size = 50 and replication factor = 3* I am producing on an average of around 500 messages / sec with each message size close to 98 bytes... More or less the message rate stays constant throughout, but after running the setup for close to 2 weeks , my Kafka cluster broke and this happened twice in a month. Not able to understand what's the issue, Kafka gurus please do share your inputs... the controlle.log file at the time of Kafka broken looks like *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26 12:03:34,998] INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions [__consumer_offsets,19],[mytopic,11],[__consumer_ offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_ offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[ mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[ mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__ consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30], [__consumer_offsets,14],[__consumer_offsets,40],[mytopic, 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35] ,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_ offsets,0],[mytopic,32],[__consumer_offsets,24],[ mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[ mytopic,35],[__consumer_offsets,20],[mytopic,1],[ mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__ consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[ mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[ mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_ _consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27] ,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD, 31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_ _consumer_offsets,48],[__consumer_offsets,11],[mytopic, 18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[ mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37] ,[__consumer_offsets,6],[__consumer_offsets,
Re: Issue in Kafka running for few days
ker id 1 (kafka.controller.KafkaController)[2017-04-26 12:03:35,045] DEBUG [Controller 1]: De-registering IsrChangeNotificationListener (kafka.controller.KafkaController)[2017-04-26 12:03:35,060] INFO [Partition state machine on Controller 1]: Stopped partition state machine (kafka.controller.PartitionStateMachine)[2017-04-26 12:03:35,060] INFO [Replica state machine on controller 1]: Stopped replica state machine (kafka.controller.ReplicaStateMachine)[2017-04-26 12:03:35,060] INFO [Controller 1]: Broker 1 resigned as the controller (kafka.controller.KafkaController)[2017-04-26 12:03:36,013] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [__consumer_offsets,19]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26 12:03:36,029] DEBUG [OfflinePartitionLeaderSelector]: [mytopic,11]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26 12:03:36,029] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [__consumer_offsets,30]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26 12:03:37,811] DEBUG [OfflinePartitionLeaderSelector]: Some broker in ISR is alive for [mytopicOLD,18]. Select 2 from ISR 2 to be the leader. (kafka.controller.OfflinePartitionLeaderSelector)* Typical broker config attached.. Please do share some valid inputs... Abhi !wq *-- * If you can't succeed, call it version 1.0 -- If you can't succeed, call it version 1.0 -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: topics stuck in "Leader: -1" after crash while migrating topics
Hi James, This "Cached zkVersion [x] not equal to that in zookeeper" issue bit us once in production and I found these ticket to be relevant: KAFKA-2729 <https://issues.apache.org/jira/browse/KAFKA-2729> KAFKA-3042 <https://issues.apache.org/jira/browse/KAFKA-3042> KAFKA-3083 <https://issues.apache.org/jira/browse/KAFKA-3083> Unfortunately, I don't believe there is a fix for it yet, or in the making. Thanks, Michał On 28/04/17 19:26, James Brown wrote: For what it's worth, shutting down the entire cluster and then restarting it did address this issue. I'd love anyone's thoughts on what the "correct" fix would be here. On Fri, Apr 28, 2017 at 10:58 AM, James Brown <jbr...@easypost.com> wrote: The following is also appearing in the logs a lot, if anyone has any ideas: INFO Partition [easypost.syslog,7] on broker 1: Cached zkVersion [647] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) On Fri, Apr 28, 2017 at 10:43 AM, James Brown <jbr...@easypost.com> wrote: We're running 0.10.1.0 on a five-node cluster. I was in the process of migrating some topics from having 2 replicas to having three replicas when two the five machines in this cluster crashed (brokers 2 and 3). After restarting them, all of the topics that were previously assigned to them are unavailable and showing "Leader: -1". Example kafka-topics output: % kafka-topics.sh --zookeeper $ZK_HP --describe --unavailable-partitions Topic: __consumer_offsets Partition: 9 Leader: -1 Replicas: 3,2,4 Isr: Topic: __consumer_offsets Partition: 13 Leader: -1 Replicas: 3,2,4 Isr: Topic: __consumer_offsets Partition: 17 Leader: -1 Replicas: 3,2,5 Isr: Topic: __consumer_offsets Partition: 23 Leader: -1 Replicas: 5,2,1 Isr: Topic: __consumer_offsets Partition: 25 Leader: -1 Replicas: 3,2,5 Isr: Topic: __consumer_offsets Partition: 26 Leader: -1 Replicas: 3,2,1 Isr: Topic: __consumer_offsets Partition: 30 Leader: -1 Replicas: 3,1,2 Isr: Topic: __consumer_offsets Partition: 33 Leader: -1 Replicas: 1,2,4 Isr: Topic: __consumer_offsets Partition: 35 Leader: -1 Replicas: 1,2,5 Isr: Topic: __consumer_offsets Partition: 39 Leader: -1 Replicas: 3,1,2 Isr: Topic: __consumer_offsets Partition: 40 Leader: -1 Replicas: 3,4,2 Isr: Topic: __consumer_offsets Partition: 44 Leader: -1 Replicas: 3,1,2 Isr: Topic: __consumer_offsets Partition: 45 Leader: -1 Replicas: 1,3,2 Isr: Note that I wasn't even moving any of the __consumer_offsets partitions, so I'm not sure if the fact that a reassignment was in progress is a red herring or not. The logs are full of ERROR [ReplicaFetcherThread-0-3], Error for partition [tracking.syslog,2] to broker 3:org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request (kafka.server.ReplicaFetcherThread) ERROR [ReplicaFetcherThread-0-3], Error for partition [tracking.syslog,2] to broker 3:org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request (kafka.server.ReplicaFetcherThread) ERROR [ReplicaFetcherThread-0-3], Error for partition [epostg.request_log_v1,0] to broker 3:org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request (kafka.server.ReplicaFetcherThread) ERROR [ReplicaFetcherThread-0-3], Error for partition [epostg.request_log_v1,0] to broker 3:org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request (kafka.server.ReplicaFetcherThread) What can I do to fix this? Should I manually reassign all partitions that were led by brokers 2 or 3 to only have whatever the third broker was in their replica-set as their replica set? Do I need to temporarily enable unclean elections? I've never seen a cluster fail this way... -- James Brown Engineer -- James Brown Engineer -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566
Re: [VOTE] 0.10.2.1 RC3
It's listed below: * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/ On 22/04/17 19:23, Shimi Kiviti wrote: Is there a maven repo with these jars so I can test it against our kafka streams services? On Sat, Apr 22, 2017 at 9:05 PM, Eno Thereska <eno.there...@gmail.com> wrote: +1 tested the usual streams tests as before. Thanks Eno On 21 Apr 2017, at 17:56, Gwen Shapira <g...@confluent.io> wrote: Hello Kafka users, developers, friends, romans, countrypersons, This is the fourth (!) candidate for release of Apache Kafka 0.10.2.1. It is a bug fix release, so we have lots of bug fixes, some super important. Release notes for the 0.10.2.1 release: http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/RELEASE_NOTES.html *** Please download, test and vote by Wednesday, April 26, 2017 *** Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary): http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/ * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/ * Javadoc: http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/javadoc/ * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h= 8e4f09caeaa877f06dc75c7da1af7a727e5e599f * Documentation: http://kafka.apache.org/0102/documentation.html * Protocol: http://kafka.apache.org/0102/protocol.html /** Your help in validating this bugfix release is super valuable, so please take the time to test and vote! Suggested tests: * Grab the source archive and make sure it compiles * Grab one of the binary distros and run the quickstarts against them * Extract and verify one of the site docs jars * Build a sample against jars in the staging repo * Validate GPG signatures on at least one file * Validate the javadocs look ok * The 0.10.2 documentation was updated for this bugfix release (especially upgrade, streams and connect portions) - please make sure it looks ok: http://kafka.apache.org/documentation.html But above all, try to avoid finding new bugs - we want to get this release out the door already :P Thanks, Gwen -- *Gwen Shapira* Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter <https://twitter.com/ConfluentInc> | blog <http://www.confluent.io/blog> -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612
Re: [VOTE] 0.10.2.1 RC0
FWIW, I upgraded without issue and noticed the speedup from KAFKA-4851/KAFKA-4876. +1 from me (non-binding) On 12/04/17 02:06, Gwen Shapira wrote: Wrong link :) http://kafka.apache.org/documentation/#upgrade and http://kafka.apache.org/documentation/streams#streams_api_changes_0102 On Tue, Apr 11, 2017 at 5:57 PM, Gwen Shapira <g...@confluent.io> wrote: FYI: I just updated the upgrade notes with Streams changes: http://kafka.apache.org/documentation/#gettingStarted On Fri, Apr 7, 2017 at 5:12 PM, Gwen Shapira <g...@confluent.io> wrote: Hello Kafka users, developers and client-developers, This is the first candidate for the release of Apache Kafka 0.10.2.1. This is a bug fix release and it includes fixes and improvements from 24 JIRAs (including a few critical bugs). See the release notes for more details: http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/RELEASE_NOTES.html *** Please download, test and vote by Thursday, 13 April, 8am PT *** Your help in validating this bugfix release is super valuable, so please take the time to test and vote! Few notes: 1. There are missing "Notable Changes" in the docs: https://github.com/apache/kafka/pull/2824 I will review, merge and update the docs by Monday. 2. The last commit (KAFKA-4943 chery-pick) did not pass system tests yet. We may need another RC if system tests fail tonight. Suggested tests: * Grab the source archive and make sure it compiles * Grab one of the binary distros and run the quickstarts against them * Extract and verify one of the site docs jars * Build a sample against jars in the staging repo * Validate GPG signatures on at least one file * Validate the javadocs look ok * Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary): http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/ * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging * Javadoc: http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/javadoc/ * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=d08115f05da0e39c7f75b45e05d6d14ad5baf71d * Documentation: http://kafka.apache.org/0102/documentation.html * Protocol: http://kafka.apache.org/0102/protocol.html Thanks, Gwen Shapira -- Gwen Shapira Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog -- Signature <http://www.openbet.com/> Michal Borowiecki Senior Software Engineer L4 T: +44 208 742 1600 +44 203 249 8448 E: michal.borowie...@openbet.com W: www.openbet.com <http://www.openbet.com/> OpenBet Ltd Chiswick Park Building 9 566 Chiswick High Rd London W4 5XT UK <https://www.openbet.com/email_promo> This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612