Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

2017-07-21 Thread Michal Borowiecki

With Kafka Streams you get those and atomicity via Exactly-once-Semantics.

Michał


On 21/07/17 14:51, Chris Richardson wrote:

Hi,

I like Kafka but I don't understand the claim that it can be used for Event
Sourcing (here <http://microservices.io/patterns/data/event-sourcing.html>
and here <https://martinfowler.com/eaaDev/EventSourcing.html>)

One part of the event sourcing is the ability to subscribe to events
published by aggregates and clearly Kafka works well there.

But the other part of Event Sourcing is "database" like functionality,
which includes

- findEventsByPrimaryKey() - needed to be able to reconstruct an
aggregate from its events - the essence of event sourcing
- Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
- business logic - insertNewEvents()) in order to handle this kind of
scenario.

The approach we have taken is to implement event sourcing using a database
and Kafka.
For instance: see
https://blog.eventuate.io/2016/10/06/eventuate-local-event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/

Chris



--
Signature
Michal Borowiecki   <http://www.openbet.com/>
*Senior Software Engineer L4*
*T:*+44 208 742 1600 <https://signature.openbet/cgi-bin/signature.php#> 
  

	*E:* 	michal.borowie...@openbet.com 
<https://signature.openbet/cgi-bin/signature.php#>
*DL: * 	+44 203 249 8448 
<https://signature.openbet/cgi-bin/signature.php#> 	


*W:*www.openbet.com 
<https://signature.openbet/cgi-bin/signature.php#>
**  


**  

<https://www.openbet.com/com/email_promo>
This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka compatibility matrix needed

2017-07-18 Thread Michal Borowiecki

Have you seen this: http://kafka.apache.org/documentation.html#upgrade

Starting with version 0.10.2, Java clients (producer and consumer) 
have acquired the ability to communicate with older brokers. Version 
0.11.0 clients can talk to version 0.10.0 or newer brokers. However, 
if your brokers are older than 0.10.0, you must upgrade all the 
brokers in the Kafka cluster before upgrading your clients. Version 
0.11.0 brokers support 0.8.x and newer clients.

Hope that helps.

Cheers,

Michał


On 18/07/17 08:17, Sachin Mittal wrote:

Hi,
I would like some help/information on what client versions are compatible
with what broker versions in kafka.

Some table like this would be good

  server
client   0.80.9   0.10   0.11
0.8  yes ?  ??
0.9  ?yes   ??
0.10?   ?yes?
0.11?   ??yes

So if question marks are filled it would be of great help.

Reason I am asking is many times we need to use other libraries/frameworks
to pull/push data from/into kafka and sometimes these support only a
particular version of clients.

Like right now I am trying to pull data from kafka via druid/tranquility
and they have clients of version 0.8.x implemented but my broker is running
0.10.x.

Also if such a table can be posted on kafka documentation page or github
page that would be great.

Thanks
Sachin



--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka Streams Usage Patterns

2017-06-24 Thread Michal Borowiecki

Hi all,

Another pattern I think is worth adding is a sliding-windowed message 
reordering and de-duplicating processor.


The outline I have in mind is based on this (just the timestamp would 
come from the record context - in this question the timestamp was in the 
body of the message):


https://stackoverflow.com/a/44345374/7897191

Please let me know if you have a better design for this?

Cheers,

Michal


On 27/05/17 21:16, Jay Kreps wrote:

This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


Hi all,

I've updated the wiki page with a draft pattern for consecutively
growing time-windowed aggregations which was discussed some time
ago on this mailing list.

I'm yet to add the part that cleans up the stores using
punctuations. Stay tuned.


On a somewhat similar subject, I've been working to implement the
following requirements:

* transaction sums per customer session (simple, just extract
non-expired session-windowed aggregates from a SessionStore using
interactive queries)

* global transaction sums for all _/currently active/_ customer
sessions

The second bit proved non-trivial, because session-windowed
KTables (or any windowed KTables for that matter) don't notify
downstream when a window expires. And I can't use punctuate until
KIP-138 is implemented because stream time punctuation is no good
in this case (records can stop coming), reliable system time
punctuation would be needed.

Below is how I implemented this, I'm yet to test it thoroughly.

I wonder if anyone knows of an easier way of achieving the same.

If so, I'm looking forward to suggestions. If not, I'll add that
to the patterns wiki page too, in case someone else finds it useful.


builder
   .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")

   .groupByKey(/*key serde*/, /*transaction serde*/)

   .aggregate(
 () -> /*empty aggregate*/,
 aggregator(),
 merger(),
 SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
 /* aggregate serde */,
 txPerCustomerSumStore()// this store can be queried for per customer 
session data )

   .toStream()

   .filter(((key, value) -> value !=null))// tombstones only come when a 
session is merged into a bigger
session, so ignore them

// the below map/groupByKey/reduce operations are to only
propagate updates to the _latest_ session per customer to downstream

   .map((windowedCustomerId, agg) ->// this moves timestamp from the 
windowed key into the value // so
that we can group by customerId only and reduce to the later value
new KeyValue<>(
   windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// 
this is just like a tuple2 but with nicely named accessors:
timestamp() and aggs()
 windowedCustomerId.window().end(),
 agg
   )
 )
   )
   .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just 
customerId .reduce(// take later session value and ignore any older - 
downstream only
cares about _current_ sessions (val, agg) -> val.timestamp() > 
agg.timestamp() ? val : agg,
 
TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
 "latest-session-windowed" )

   .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with 
maximum granularity, which is
per-partition new KeyValue<>(
   new Windowed<>(
 windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// 
KIP-159 would come in handy here, to access partition number
instead
 windowedCustomerId.window()// will use this in the interactive 
queries to pick the oldest
not-yet-expired window
   ),
   timeAndAggs.aggs()
 ),
 new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
   )

   .reduce(
 (val, agg) -> agg.add(val),
 (val, agg) -> agg.subtract(val),
 txTotalsStore()// this store can be queried to get totals per 
partition for all
active sessions );

builder.globalTable(
   new SessionKeySerde<>(Serdes.Integer()),
   /* aggregate serde */,
   changelogTopicForStore(TRANSACTION_TOTALS),"totals");
// this global table puts per partition totals on every node, so
that they can be easily summed for global totals, picking the
oldest not-yet-expired window

TODO: put in StreamParitioners (with KTable.through variants added
in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.

The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I
want to first do summation with max parallelism and minimize t

Re: ticketing system Design

2017-06-21 Thread Michal Borowiecki
If your business flow involves human actions, personally I would look at 
a business process engine like the open source camunda.


Even if you don't choose to use it in production, you can use it to 
prototype and evolve your design at the inception stage.


There's a simple to run example that integrates with kafka here:

https://github.com/flowing/flowing-retail

And a tutorial that involves a human action in the flow here (but no kafka):

https://docs.camunda.org/get-started/bpmn20/

(NB. My personal interest in camunda is for integrating it as a process 
manager/saga element in an event-sourced service at some point)


Cheers,
Michał

On 21/06/17 03:25, Tarun Garg wrote:

need some more input on this.


Kafka is a queue it doesn't take any action.


sender(producer) sends data to kafka and consumer pulls data from kafka queue. 
so there is no assignment of data to any consumer.

if a process/human cann't take any action then kafka cann't help in this case.

hope it answers.


From: Abhimanyu Nagrath <abhimanyunagr...@gmail.com>
Sent: Monday, June 19, 2017 8:01 PM
To: users@kafka.apache.org
Subject: Re: ticketing system Design

Hi ,

Can anyone suggest me where I can get the answer for these type of
questions?


Regards,
Abhimanyu

On Thu, Jun 8, 2017 at 6:49 PM, Abhimanyu Nagrath <
abhimanyunagr...@gmail.com> wrote:


Hi ,

Is Apache Kafka along with storm can be used to design a ticketing system.
By ticketing system, I mean that there are millions of tasks stored in
Kafka queues and there are processes/humans to take some actions on the
task. there are come constraints that same task should not be assigned to
two processes/humans and if a task flows to a process/human and no action
is performed it should be reassigned.
  I am not sure whether this can be solved using Kafka.Any help is
appreciated



Regards,
Abhimanyu



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka Streams: Problems implementing rate limiting processing step

2017-06-20 Thread Michal Borowiecki

Thanks, maybe I miss-read it:


I then tried implementing my own scheduling that periodically sends/clears out 
messages using the ProcessorContext provided to the aforementioned transform ste
I understood it to say it inspects a state store, sends the messages 
that should be sent and removes them from the store. I might have read 
too much out of it though.


Cheers,

Michał


On 20/06/17 17:59, Matthias J. Sax wrote:

I didn't know you could write to state stores from outside a
processor/transformer.

You can't. And as far as I understand this thread, nobody said you can.
Did I miss something?


-Matthias

On 6/20/17 1:02 AM, Michal Borowiecki wrote:

I didn't know you could write to state stores from outside a
processor/transformer. Interesting to hear that it is working although
I'd be careful as KIP-67 warns it can introduce undefined behaviour:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams


Operations on state stores

The focus is on querying state stores, not updating them. It is not
clear what it would mean to update a state store from outside the
stream processing framework. Such updates are likely to introduce
undefined behavior to the framework.


The way I'd approach it until KIP-138 is released is to still use
punctuate() but to also use your own scheduling to send periodic "tick"
messages into the input topic. These messages can be ignored by the
Processor but will cause the stream time to advance reliably. Just need
to ensure they are distributed uniformly to all partitions.

I appreciate this is not a elegant workaround but this is what I've
settled for in the interim.

Cheers,

Michal


On 19/06/17 23:03, Steven Schlansker wrote:

On Jun 19, 2017, at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote:

I then tried implementing my own scheduling that periodically sends/clears out 
messages using the ProcessorContext provided to the aforementioned transform 
step. However, it seems that when I call forward() from my scheduler (i.e. not 
in a process()/punctuate() call), I get a NullPointerException at 
ProcessorContextImpl.java:81 
(https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81).
 I assume that this is because currentNode() is null outside of 
process()/punctuate() calls.

There may be more elegant or direct solutions, but if all else fails you could 
always consider producing to a topic rather than trying to forward directly, 
then you don't have to touch the relatively delicate Processor semantics.


--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-06-20 Thread Michal Borowiecki

+1


On 19/06/17 21:31, Vahid S Hashemian wrote:

Thanks everyone. Great discussion.

Because these Read or Write actions are interpreted in conjunction with
particular resources (Topic, Group, ...) it would also make more sense to
me that for committing offsets the ACL should be (Group, Write).
So, a consumer would be required to have (Topic, Read), (Group, Write)
ACLs in order to function.

--Vahid




From:   Colin McCabe <cmcc...@apache.org>
To: users@kafka.apache.org
Date:   06/19/2017 11:01 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Thanks for the explanation.  I still think it would be better to have
the mutation operations require write ACLs, though.  It might not be
100% intuitive for novice users, but the current split between Describe
and Read is not intuitive for either novice or experienced users.

In any case, I am +1 on the incremental improvement discussed in
KIP-163.

cheers,
Colin


On Sat, Jun 17, 2017, at 11:11, Hans Jespersen wrote:

Offset commit is something that is done in the act of consuming (or
reading) Kafka messages.
Yes technically it is a write to the Kafka consumer offset topic but

it's

much easier for
administers to think of ACLs in terms of whether the user is allowed to
write (Produce) or
read (Consume) messages and not the lower level semantics that are that
consuming is actually
reading AND writing (albeit only to the offset topic).

-hans





On Jun 17, 2017, at 10:59 AM, Viktor Somogyi

<viktor.somo...@cloudera.com> wrote:

Hi Vahid,

+1 for OffsetFetch from me too.

I also wanted to ask the strangeness of the permissions, like why is
OffsetCommit a Read operation instead of Write which would intuitively

make

more sense to me. Perhaps any expert could shed some light on this? :)

Viktor

On Tue, Jun 13, 2017 at 2:38 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>> wrote:


Hi Michal,

Thanks a lot for your feedback.

Your statement about Heartbeat is fair and makes sense. I'll update

the

KIP accordingly.

--Vahid




From:    Michal Borowiecki <michal.borowie...@openbet.com>
To:users@kafka.apache.org, Vahid S Hashemian <
vahidhashem...@us.ibm.com>, d...@kafka.apache.org
Date:06/13/2017 01:35 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch
--



Hi Vahid,

+1 wrt OffsetFetch.

The "Additional Food for Thought" mentions Heartbeat as a

non-mutating

action. I don't think that's true as the GroupCoordinator updates the
latestHeartbeat field for the member and adds a new object to the
heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration()
called from handleHeartbeat()

NB added dev mailing list back into CC as it seems to have been lost

along

the way.

Cheers,

Michał


On 12/06/17 18:47, Vahid S Hashemian wrote:
Hi Colin,

Thanks for the feedback.

To be honest, I'm not sure either why Read was selected instead of

Write

for mutating APIs in the initial design (I asked Ewen on the

corresponding

JIRA and he seemed unsure too).
Perhaps someone who was involved in the design can clarify.

Thanks.
--Vahid




From:   Colin McCabe *<cmcc...@apache.org <mailto:cmcc...@apache.org

* <cmcc...@apache.org <mailto:cmcc...@apache.org>>

To: *users@kafka.apache.org <mailto:users@kafka.apache.org>*

<users@kafka.apache.org <mailto:users@kafka.apache.org>>

Date:   06/12/2017 10:11 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Hi Vahid,

I think you make a valid point that the ACLs controlling group
operations are not very intuitive.

This is probably a dumb question, but why are we using Read for

mutating

APIs?  Shouldn't that be Write?

The distinction between Describe and Read makes a lot of sense for
Topics.  A group isn't really something that you "read" from in the

same

way as a topic, so it always felt kind of weird there.

best,
Colin


On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote:

Hi all,

I'm resending my earlier note hoping it would spark some conversation
this
time around :)

Thanks.
--Vahid




From:   "Vahid S Hashemian" *<vahidhashem...@us.ibm.com <

mailto:vahidhashem...@us.ibm.com>>*

<vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>>
To: dev *<d...@kafka.apache.org <mailto:d...@kafka.apache.org>>*

<d...@kafka.apache.org <mailto:d...@kafka.apache.org>>, "Kafka User"

*<users@kafka.apache.org <mailto:users@kafka.apache.org>>*

<users@kafka.apache.org <mailto:users@kafka.apache.org>>

Date:   05/30/2017 08:33 AM
Subject:KIP-163: Lower the Minimum Required ACL Permission of
OffsetFetch



Hi,

I started a new KIP to improve the minimum required ACL permissions

of

some

Re: Kafka Streams: Problems implementing rate limiting processing step

2017-06-20 Thread Michal Borowiecki
I didn't know you could write to state stores from outside a 
processor/transformer. Interesting to hear that it is working although 
I'd be careful as KIP-67 warns it can introduce undefined behaviour:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams


Operations on state stores

The focus is on querying state stores, not updating them. It is not 
clear what it would mean to update a state store from outside the 
stream processing framework. Such updates are likely to introduce 
undefined behavior to the framework.




The way I'd approach it until KIP-138 is released is to still use 
punctuate() but to also use your own scheduling to send periodic "tick" 
messages into the input topic. These messages can be ignored by the 
Processor but will cause the stream time to advance reliably. Just need 
to ensure they are distributed uniformly to all partitions.


I appreciate this is not a elegant workaround but this is what I've 
settled for in the interim.


Cheers,

Michal


On 19/06/17 23:03, Steven Schlansker wrote:

On Jun 19, 2017, at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote:

I then tried implementing my own scheduling that periodically sends/clears out 
messages using the ProcessorContext provided to the aforementioned transform 
step. However, it seems that when I call forward() from my scheduler (i.e. not 
in a process()/punctuate() call), I get a NullPointerException at 
ProcessorContextImpl.java:81 
(https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81).
 I assume that this is because currentNode() is null outside of 
process()/punctuate() calls.

There may be more elegant or direct solutions, but if all else fails you could 
always consider producing to a topic rather than trying to forward directly, 
then you don't have to touch the relatively delicate Processor semantics.



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka 0.11 transactions API question

2017-06-19 Thread Michal Borowiecki
I'm not sure if I understood correctly, but if you want to integrate a 
single kafka producer transaction (or any transaction manager that only 
supports local transaction) into a distributed transaction, I think you 
can do so as long as all other involved transaction managers support 
2-phase commit. In other words, you can include one (and only one) 
local-only transaction into a distributed transaction.


The steps the distributed transaction coordinator would have to take to 
commit the transaction would be:


1. call prepare on each of the transaction participants that support
   2-phase commit
2. if any of them fails, abort all transactions, otherwise, proceed
3. call commit on the one transaction participant that does not support
   2-phase commit (kafka producer in this case)
4. if that fails, abort all transactions, otherwise, proceed
5. call commit on all the transaction participants that support 2-phase
   commit (since prepare on these succeeded they should not refuse to
   commit at this point)

So as to your concern about getting "clearance" (I take it as the 
equivalent of the "prepare" call) from the kafka producer, you don't 
really need it IMO, as if commit fails on the kafka producer, you can 
still abort the remaining transactions.


Of course you can't do that if you have more than one transaction that 
doesn't support 2-phase commit in play.


Having said that, the advice these days seems to be to design 
distributed systems for eventual consistency, as using distributed 
transactions, while tempting, often leads to resource exhaustion as 
transaction managers have to go the extra mile to ensure they can commit 
any transaction that had prepare return successfully.


Just my 5c. I may be wrong in any of the above, please point it out if so.

Cheers,

Michał


On 19/06/17 14:57, Piotr Nowojski wrote:
Sorry for responding to my own message, but when I sent an original 
message/question I was not subscribed to this mailing list and now I 
can not respond to Matthias answer directly.


I don't want to share a transaction between multiple Producers 
threads/processes, I just would like to resume an interrupted 
transaction after a machine crash.


Let me try to phrase the problem differently:

From the perspective of a producer that writes to Kafka, we have the 
following situation:


We integrate the producer with transaction in another system. A number 
or records should go together atomically (a transaction). Before 
committing the transaction, we frequently need to ask for a 
"clearance" status, and if we get the "go ahead" we want to commit the 
transaction.


Unfortunately, as soon as we get that "clearance", we cannot reproduce 
the records any more (the are dropped from the original data stream).
If something fails between the "go ahead" and the committing, we need 
to retry the transaction, so we need to come up again with all 
records. As a result we have to persist the records before we start 
the write transaction. That is a painful overhead to pay, and a lot of 
additional operational complexity.


The simplest way to support that pattern without extra overhead would 
we could "resume" a transaction:


  - Each transaction as a unique Transaction ID
  - If a crash of the producer occurs, the transaction is NOT aborted 
automatically.
  - Instead, the restarted producer process reconnects to the 
transaction and decides to commit it or abort it.
  - The transaction timeout aborts the transaction after a while if 
inactivity.


Maybe this could be easily supported?

Thanks, Piotrek

2017-06-16 17:59 GMT+02:00 Piotr Nowojski <piotr.nowoj...@gmail.com 
<mailto:piotr.nowoj...@gmail.com>>:


But isn't it a low hanging fruit at this moment? Isn't that just
an API limitation and wouldn't the backend for transactions
support it with only minor changes to the API (do not fail
automatically dangling transactions on Producer restart)? Flushing
is already there so that _should_ handle the pre-commit. Again,
maybe I'm missing something and for sure I am not familiar with
    Kafka's internals.

Piotrek

2017-06-16 15:47 GMT+02:00 Michal Borowiecki
<michal.borowie...@openbet.com
<mailto:michal.borowie...@openbet.com>>:

I don't think KIP-98 is as ambitious as to provide support for
distributed transactions (2 phase commit).

It would be great if I was wrong though :P

Cheers,

Michał


On 16/06/17 14:21, Piotr Nowojski wrote:

Hi, I'm looking into Kafka's transactions API as proposed in
KIP-98. I've read both this KIP-98 document and I looked into
the code that is on the master branch. I would like to use it
to implement some two phase commit mechanism on top of the
Kafka's transactions, that would allow me to tie multiple
systems (some of them might not be

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Michal Borowiecki
If confusion is the problem, then totally agree no point adding more 
knobs. Perhaps you're right that users don't /really/ want 
processing-time semantics. Just /think/ they want them until they start 
considering replay/catch-up scenarios. I guess people rarely think about 
those from the start (I sure didn't).


Cheers,

Michał


On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually /want/ processing time 
semantics? There are definitely times when its safe to assume the two 
are close enough that a little lossiness doesn't matter much but it is 
pretty hard to make assumptions about when the processing time is and 
has been hard for us to think of a use case where its actually desirable.


I think mostly what we've seen is confusion about the core concepts:

  * stream -- immutable events that occur
  * tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. 
If the root problem is we're missing important use cases that justify 
the additional knobs then i think it's good to try to really 
understand them. I think there could be use cases around systems that 
don't take updates, example would be email, twitter, and some metrics 
stores.


One solution that would be less complexity inducing than allowing new 
semantics, but might help with the use cases we need to collect, would 
be to add a new operator in the DSL. Something like .freezeAfter(30, 
TimeUnit.SECONDS) that collects all updates for a given window and 
both emits and enforces a single output after 30 seconds after the 
advancement of stream time and remembers that it is omitted, 
suppressing all further output (so the output is actually a KStream). 
This might or might not depend on wall clock time. Perhaps this is in 
fact what you are proposing?


-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


I wonder if it's a frequent enough use case that Kafka Streams
should consider providing this out of the box - this was asked for
multiple times, right?

Personally, I agree totally with the philosophy of "no final
aggregation", as expressed by Eno's post, but IMO that is
predicated totally on event-time semantics.

If users want processing-time semantics then, as the docs already
point out, there is no such thing as a late-arriving record -
every record just falls in the currently open window(s), hence the
notion of final aggregation makes perfect sense, from the
usability point of view.

The single abstraction of "stream time" proves leaky in some cases
(e.g. for punctuate method - being addressed in KIP-138). Perhaps
this is another case where processing-time semantics warrant
explicit handling in the api - but of course, only if there's
sufficient user demand for this.

What I could imagine is a new type of time window
(ProcessingTimeWindow?), that if used in an aggregation, the
underlying processor would force the WallclockTimestampExtractor
(KAFKA-4144 enables that) and would use the system-time
punctuation (KIP-138) to send the final aggregation value once the
window has expired and could be configured to not send
intermediate updates while the window was open.

Of course this is just a helper for the users, since they can
implement it all themselves using the low-level API, as Matthias
pointed out already. Just seems there's recurring interest in this.

Again, this only makes sense for processing time semantics. For
event-time semantics I find the arguments for "no final
aggregation" totally convincing.


Cheers,

Michał


On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:

https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

<https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
wi

Re: Kafka 0.11 transactions API question

2017-06-16 Thread Michal Borowiecki
I don't think KIP-98 is as ambitious as to provide support for 
distributed transactions (2 phase commit).


It would be great if I was wrong though :P

Cheers,

Michał


On 16/06/17 14:21, Piotr Nowojski wrote:

Hi,

I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
both this KIP-98 document and I looked into the code that is on the master
branch. I would like to use it to implement some two phase commit mechanism
on top of the Kafka's transactions, that would allow me to tie multiple
systems (some of them might not be Kafka) in one transaction.

Maybe I'm missing something but the problem is I don't see a way to
implement it using proposed Kafka's transactions API. Even if I have just
two processes writing to Kafka topics, I don't know how can I guarantee
that if one's transaction is committed, the other will also eventually be
committed. This is because if first KafkaProducer successfully commits, but
the second one fails before committing it's data, after restart the second
one's "initTransactions" call will (according to my understanding of the
API) abort previously non completed transactions.

Usually transactional systems expose API like this
<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
Namely there is a known identifier for a transaction and you can pre-commit
it (void prepare(...) method in before mentioned example) and then commit
or you can abort this transaction. Usually pre-commit involves flushing
stuff to some temporary files and commit move those files to the final
directory. In case of machine/process failure, if it was before
"pre-commit", we can just rollback all transactions from all of the
processes. However once every process acknowledge that it completed
"pre-commit", each process should call "commit". If some process fails at
that stage, after restarting this process, I would expect to be able to
restore it's "pre-committed" transaction (having remembered transaction's
id) and re attempt to commit it - which should be guaranteed to eventually
succeed.

In other words, it seems to me like the missing features of this API for me
are:
1. possibility to resume transactions after machine/process crash. At least
I would expect to be able to commit "flushed"/"pre-committed" data for such
transactions.
2. making sure that committing already committed transactions doesn't brake
anything

Or maybe there is some other way to integrate Kafka into such two phase
commit system that I'm missing?

Thanks, Piotrek



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Michal Borowiecki
ploring Kafka Streams and it's very powerful imho even because the usage 
is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska <eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams.
You could reduce the number of downstream records by using record caches: 
http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
 
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the Interactive 
Query APIs (so when you query dictates what  data you receive), see this 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno

On Jun 15, 2017, at 2:38 PM, Paolo Patierno <ppatie...@live.com> wrote:

Hi,


using the streams library I noticed a difference (or there is a lack of 
knowledge on my side)with Apache Spark.

Imagine following scenario ...


I have a source topic where numeric values come in and I want to check the 
maximum value in the latest 5 seconds but ... putting the max value into a 
destination topic every 5 seconds.

This is what happens with reduceByWindow method in Spark.

I'm using reduce on a KStream here that process the max value taking into 
account previous values in the latest 5 seconds but the final value is put into 
the destination topic for each incoming value.


For example ...


An application sends numeric values every 1 second.

With Spark ... the source gets values every 1 second, process max in a window 
of 5 seconds, puts the max into the destination every 5 seconds (so when the 
window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.

With Kafka Streams ... the source gets values every 1 second, process max in a 
window of 5 seconds, puts the max into the destination every 1 seconds (so 
every time an incoming value arrives). Of course, if for example the sequence 
is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.


Is it possible with Kafka Streams ? Or it's something to do at application 
level ?


Thanks,

Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-06-13 Thread Michal Borowiecki

Hi Vahid,

+1 wrt OffsetFetch.

The "Additional Food for Thought" mentions Heartbeat as a non-mutating 
action. I don't think that's true as the GroupCoordinator updates the 
latestHeartbeat field for the member and adds a new object to the 
heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration() 
called from handleHeartbeat()



NB added dev mailing list back into CC as it seems to have been lost 
along the way.


Cheers,

Michał


On 12/06/17 18:47, Vahid S Hashemian wrote:

Hi Colin,

Thanks for the feedback.

To be honest, I'm not sure either why Read was selected instead of Write
for mutating APIs in the initial design (I asked Ewen on the corresponding
JIRA and he seemed unsure too).
Perhaps someone who was involved in the design can clarify.

Thanks.
--Vahid




From:   Colin McCabe <cmcc...@apache.org>
To: users@kafka.apache.org
Date:   06/12/2017 10:11 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
Permission of OffsetFetch



Hi Vahid,

I think you make a valid point that the ACLs controlling group
operations are not very intuitive.

This is probably a dumb question, but why are we using Read for mutating
APIs?  Shouldn't that be Write?

The distinction between Describe and Read makes a lot of sense for
Topics.  A group isn't really something that you "read" from in the same
way as a topic, so it always felt kind of weird there.

best,
Colin


On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote:

Hi all,

I'm resending my earlier note hoping it would spark some conversation
this
time around :)

Thanks.
--Vahid




From:   "Vahid S Hashemian" <vahidhashem...@us.ibm.com>
To: dev <d...@kafka.apache.org>, "Kafka User"

<users@kafka.apache.org>

Date:   05/30/2017 08:33 AM
Subject:KIP-163: Lower the Minimum Required ACL Permission of
OffsetFetch



Hi,

I started a new KIP to improve the minimum required ACL permissions of
some of the APIs:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch


The KIP is to address KAFKA-4585.

Feedback and suggestions are welcome!

Thanks.
--Vahid













--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Re: synchronous request response using kafka

2017-06-09 Thread Michal Borowiecki

cc-ed users mailing list, as I think it's more appropriate for this thread.

Sanjay, if you what you're after is the following pattern:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html

then yes, you can do this in kafka. The outline would be similar to the 
JMS example above, but specifics of course different.


Also, currently you would have to put the reply topic name and 
correlation id into the msg value itself but from v0.11 you can use 
custom headers for that.


Hope that helps,
Michał


On 12/05/17 22:02, Colin McCabe wrote:

Hi Sanjay,

Can you be a little clearer what you are trying to achieve?  If you want
to build an RPC system where one entity makes a remote procedure call to
another, you might consider using something like CORBA, Apache Thrift,
gRPC, etc.

best,
Colin


On Fri, May 12, 2017, at 07:55, Banerjee, Sanjay wrote:

Can someone please share some thoughts whether we can do synchronous call
  (request response) using kafka similar to JMS

Thanks
Sanjay
913-221-9164



--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Zookeeper on same server as Kafka

2017-06-04 Thread Michal Borowiecki
We are indeed running this setup in production, have been for almost 2 
years now, over a gradually increasing number of deployments.


Let me clarify though:

Our clusters don't exceed 5 nodes. We're not exercising Kafka nowhere 
near its limits, or bandwidth or disk I/O for that matter.


When we reach the point we want to scale Kafka independently from ZK, we 
will. The current setup is not set in stone. But for now everything is 
running under modest load, so don't see the point of forgoing the 
simplicity of this until we actually have a need to scale further or put 
greater load on it. YMMV, not everyone is running at the same scale as 
Heroku ;-)


Thanks,

Michal


On 04/06/17 11:34, Tom Crayford wrote:

Hi,

I would not recommend running this kind of set up in production. Busy 
Kafka brokers use up a lot of disk and network bandwidth, which 
zookeeper does not deal well with. This means that a burst of traffic 
to 1 node carries the risk of disrupting the ZK ensemble.


Secondly, this will cause problems down the line, because you will 
want to scale Kafka independently from ZK. ZK gets slower as you add 
nodes, but Kafka can scale out for quite a while. For production 
clusters, I'd recommend to always have 5 ZK nodes, but for Kafka, you 
can scale well past that, or keep it small while you are starting out.


Thanks,

Tom Crayford
Heroku Kafka

On Sat, Jun 3, 2017 at 8:20 AM, Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> 
wrote:


I'm not an expert but I prefer keeping zookeepers on the same
hosts as kafka brokers and mimic each-others topology. The reason
is to minimize the chance of e.g. kafka brokers being able to talk
to one another but zookeepers not, or vice-versa. So, I'd say I
/do/ want my kafka broker and the co-located zookeeper to go down
together - for simplicity - prefer that to some asymmetric
failures to debug. This comes from past experience, albeit using
other technologies, when relying on 2 different clustering
mechanism made failures in one but not the other very difficult to
debug.

Also, I think I read this advice somewhere a long time ago (don't
recall where) and it made sense to me (given the prior experience)
and we've never tried a different arrangement.

As to the overheads, I believe it's mostly disk IO and can
hopefully be addressed by separate disks for each but it's never
been a bottleneck for us, so can't really say.

Thanks,

Michał


On 02/06/17 21:47, Mohammed Manna wrote:

Usually, the overhead comes when you have kafka and zookeeper doing the
housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you
should keep their logs on totally different physical machine for better
performance. Furthermore, if a mechanical failure occurs, you might not
want both zookeeper and broker going down together.

Can anyone else chime in for some better points?


On 2 Jun 2017 7:57 pm, "Meghana Narasimhan"<mnarasim...@bandwidth.com> 
<mailto:mnarasim...@bandwidth.com>
wrote:

Hi,
What are the pros and cons of setting up Zookeeper on the same server as
the Kafka broker ? Earlier offsets were being written to zookeeper which
was a major overhead but with offsets being written to Kafka now, what
other requirements should be taken into consideration for setting up
Zookeeper on the same server as Kafka vs having a separate zookeeper
cluster ?

Thanks,
Meghana



-- 
    <http://www.openbet.com/> 	Michal Borowiecki

Senior Software Engineer L4
T:  +44 208 742 1600 <tel:+44%2020%208742%201600>


+44 203 249 8448 <tel:+44%2020%203249%208448>



E:  michal.borowie...@openbet.com
<mailto:michal.borowie...@openbet.com>
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee.
If you have received this message in error, please immediately
notify the postmas...@openbet.com <mailto:postmas...@openbet.com>
and delete it from your system as well as any copies. The content
of e-mails as well as traffic data may be monitored by OpenBet for
employment and security purposes. To protect the environment
please do not print this e-mail unless necessary. OpenBet Ltd.
Registered Office: Chiswick Park Building 9, 566 Chiswick High
Road, London, W4 5XT, United Kingdom. A company registered in
England and Wales. Registered no. 3134634. VAT no. GB927523612




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:

Re: Zookeeper on same server as Kafka

2017-06-03 Thread Michal Borowiecki
I'm not an expert but I prefer keeping zookeepers on the same hosts as 
kafka brokers and mimic each-others topology. The reason is to minimize 
the chance of e.g. kafka brokers being able to talk to one another but 
zookeepers not, or vice-versa. So, I'd say I /do/ want my kafka broker 
and the co-located zookeeper to go down together - for simplicity - 
prefer that to some asymmetric failures to debug. This comes from past 
experience, albeit using other technologies, when relying on 2 different 
clustering mechanism made failures in one but not the other very 
difficult to debug.


Also, I think I read this advice somewhere a long time ago (don't recall 
where) and it made sense to me (given the prior experience) and we've 
never tried a different arrangement.


As to the overheads, I believe it's mostly disk IO and can hopefully be 
addressed by separate disks for each but it's never been a bottleneck 
for us, so can't really say.


Thanks,

Michał


On 02/06/17 21:47, Mohammed Manna wrote:

Usually, the overhead comes when you have kafka and zookeeper doing the
housekeeping (i.e. Disk IO) on the same server. ZK even suggests that you
should keep their logs on totally different physical machine for better
performance. Furthermore, if a mechanical failure occurs, you might not
want both zookeeper and broker going down together.

Can anyone else chime in for some better points?


On 2 Jun 2017 7:57 pm, "Meghana Narasimhan" <mnarasim...@bandwidth.com>
wrote:

Hi,
What are the pros and cons of setting up Zookeeper on the same server as
the Kafka broker ? Earlier offsets were being written to zookeeper which
was a major overhead but with offsets being written to Kafka now, what
other requirements should be taken into consideration for setting up
Zookeeper on the same server as Kafka vs having a separate zookeeper
cluster ?

Thanks,
Meghana



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Kafka Streams Usage Patterns

2017-05-27 Thread Michal Borowiecki

Hi all,

I've updated the wiki page with a draft pattern for consecutively 
growing time-windowed aggregations which was discussed some time ago on 
this mailing list.


I'm yet to add the part that cleans up the stores using punctuations. 
Stay tuned.



On a somewhat similar subject, I've been working to implement the 
following requirements:


* transaction sums per customer session (simple, just extract 
non-expired session-windowed aggregates from a SessionStore using 
interactive queries)


* global transaction sums for all _/currently active/_ customer sessions

The second bit proved non-trivial, because session-windowed KTables (or 
any windowed KTables for that matter) don't notify downstream when a 
window expires. And I can't use punctuate until KIP-138 is implemented 
because stream time punctuation is no good in this case (records can 
stop coming), reliable system time punctuation would be needed.


Below is how I implemented this, I'm yet to test it thoroughly.

I wonder if anyone knows of an easier way of achieving the same.

If so, I'm looking forward to suggestions. If not, I'll add that to the 
patterns wiki page too, in case someone else finds it useful.



builder
  .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")

  .groupByKey(/*key serde*/, /*transaction serde*/)

  .aggregate(
() -> /*empty aggregate*/,
aggregator(),
merger(),
SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
/* aggregate serde */,
txPerCustomerSumStore()// this store can be queried for per customer 
session data )

  .toStream()

  .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session, 
so ignore them


// the below map/groupByKey/reduce operations are to only propagate 
updates to the _latest_ session per customer to downstream


  .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that 
we can group by customerId only and reduce to the later value new KeyValue<>(
  windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors: 
timestamp() and aggs()

windowedCustomerId.window().end(),
agg
  )
)
  )
  .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares 
about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,


TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
"latest-session-windowed" )

  .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum 
granularity, which is per-partition new KeyValue<>(
  new Windowed<>(
windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// 
KIP-159 would come in handy here, to access partition number instead
windowedCustomerId.window()// will use this in the interactive queries to pick the oldest 
not-yet-expired window

  ),
  timeAndAggs.aggs()
),
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
  )

  .reduce(
(val, agg) -> agg.add(val),
(val, agg) -> agg.subtract(val),
txTotalsStore()// this store can be queried to get totals per partition for all active 
sessions );


builder.globalTable(
  new SessionKeySerde<>(Serdes.Integer()),
  /* aggregate serde */,
  changelogTopicForStore(TRANSACTION_TOTALS),"totals");
// this global table puts per partition totals on every node, so that 
they can be easily summed for global totals, picking the oldest 
not-yet-expired window


TODO: put in StreamParitioners (with KTable.through variants added in 
KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.


The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to 
first do summation with max parallelism and minimize the work needed 
downstream. So I calculate a per-partition sum first to limit the 
updates that the totals topic will receive and the summing work done by 
the interactive queries on the global store. Is this a good way of going 
about it?


Thanks,

Michał


On 09/05/17 18:31, Matthias J. Sax wrote:

Hi,

I started a new Wiki page to collect some common usage patterns for
Kafka Streams.

Right now, it contains a quick example on "how to compute average". Hope
we can collect more example like this!

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns


-Matthias



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick 

Re: Partition assignment with multiple topics

2017-05-23 Thread Michal Borowiecki

Hi Mike,

Are you performing any operations (e.g. joins) across all topics?

If so, I'd think increasing the number of partitions is indeed the way 
to go. Partition is the unit of parallelism per topic and all topics are 
bound together in your app in that case.


If not, your other option is to break up your application into a number 
of KafkaStreams instances, each dealing with a subset of topics.


Hope that helps.
Michał

On 23/05/17 08:47, Mike Gould wrote:

Hi
We have a couple of hundred topics - each carrying a similar but distinct
message type but to keep the total partition count down each only has 3
partitions.

If I start Kafka-streams consuming all topics only 3 threads ever get
assigned any partitions.

I think the first thread to start gets the first partition of each topic,
and so on until the 3rd thread, after that all the partitions are assigned
- any further threads are just left idle.

Is there any way to make the partition assignment smarter and either add a
random element that moves partitions when further consumers start or
considers all the partitions of subscribed topics together when assigning
them?

Our only alternative is creating many more partitions for each topic - and
we're worried about how far this will scale.

Thank you
Mike G




--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Order of punctuate() and process() in a stream processor

2017-05-17 Thread Michal Borowiecki
Hi Sini, 

This is beyond the score of KIP-138 but 
https://issues.apache.org/jira/browse/KAFKA-3514 exists to track such 
improvements

Thanks, 

Michal

On 17 May 2017 5:10 p.m., Peter Sinoros Szabo  
wrote:

Hi,

I see, now its clear why the repeated punctuations use the same time value 
in that case.

Do you have a JIRA ticket to track improvement ideas for that?

It would be great to have an option to:
- advance the stream time before calling the process() on a new record  - 
this would prevent to process a message in the wrong punctuation 
"segment".
- use fine grained advance of stream time for the "missed" punctuations  - 
this would ease the processing of burst messages after some silence. I do 
not see if KIP-138 may solve this or not.

Regards

-Sini



From:   "Matthias J. Sax" 
To: users@kafka.apache.org
Date:   2017/05/12 19:19
Subject:Re: Order of punctuate() and process() in a stream 
processor



Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the 
same 
> timestamp 3 times, so in order to create my update for both three 
seconds, 
> I will have to count the number of punctuations and calculate the missed 

> stream times for myself. It's ok for me to trigger it 3 times, but the 
> timestamp should not be the same in each, but should be increased by the 

> schedule time in each punctuate.
> 
> - Sini
> 
> 
> 
> From:   Thomas Becker 
> To: "users@kafka.apache.org" 
> Date:   2017/05/12 18:57
> Subject:RE: Order of punctuate() and process() in a stream 
> processor
> 
> 
> 
> I'm a bit troubled by the fact that it fires 3 times despite the stream 
> time being advanced all at once; is there a scenario when this is 
> beneficial?
> 
> 
> From: Matthias J. Sax [matth...@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
> 
> Hi Peter,
> 
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a 
record.
> 
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> 
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
> 
> Does this make sense?
> 
> Is this critical for your use case? Or do you just want to understand
> what's happening?
> 
> 
> -Matthias
> 
> 
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above) 

> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and 
then
>> process() is called on the messages, because the first message's 
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what 
is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-sz...@hu.ibm.com
>>
> 
> 
> 
> This email 

Re: How to chain increasing window operations one after another

2017-05-09 Thread Michal Borowiecki

Just had a thought:

If you implement the Windowed/Tuple serde to store the timestamp(s) 
before the actual record key then you can simply periodically do a 
ranged query on each of the state stores to find and delete all data 
older than ... (using punctuate() inside a Processor).


Any downsides to that?

Cheers,

Michał


On 09/05/17 09:17, Michal Borowiecki wrote:

Hi Matthias,

Yes, the ever growing stores were my concern too. That was the 
intention behind my TODO note in the first reply just didn't want to 
touch on this until I've dug deeper into it.


I understand compaction+retention policy on the backing changelog 
topics takes care of cleaning up on the broker-side but Rocks dbs will 
grow indefinitely, right? (until re-balanced?)



Punctuation was the first idea that came to my mind too when 
originally faced this problem on my project. However, as you said it's 
only on KStream and aggregations on KStream actually discard 
tombstones and don't forward them on to the KTable:


https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L798-L799 



 * Aggregate the values of records in this stream by the grouped key.
 * Records with {@code null} key or value are ignored.

I haven't come up with a satisfactory solution yet, but it's still on 
my mind.



TTLs on stores could potentially solve this issue and just today they 
were asked about on SO: 
http://stackoverflow.com/questions/43860114/kafka-streams-low-level-processor-api-rocksdb-timetolivettl/43862922#43862922


Garrett, was that you? :-)


Thanks,

Michał


On 08/05/17 23:29, Matthias J. Sax wrote:

Thinking about this once more (and also having a fresh memory of another
thread about KTables), I am wondering if this approach needs some extra
tuning:

As the result of the first window aggregation produces an output stream
with unbounded key space, the following (non-windowed) KTables would
grow indefinitely, if I don't miss anything.

Thus, it might be required to put a transform() that only forwards all
data 1-to-1, but additionally registers a punctuation schedule. When
punctuation is called, it would be required to send tombstone messages
downstream (or a simliar) that deletes windows that are older than the
retention time. Sound tricky to implement though... `transform()` would
need to keep track of used keys to send appropriate tombstones in an
custom state. Also. `transform` is only available for KStream and
transforming (windowed) KTable into KStream into KTable while preserving
the required semantics seems not to be straight forwards.

Any thoughts about this potential issue?


-Matthias


On 5/8/17 3:05 PM, Garrett Barton wrote:

Michael,
   This is slick!  I am still writing unit tests to verify it.  My code
looks something like:

KTable<Windowed, CountSumMinMaxAvgObj> oneMinuteWindowed =
srcStream// my val object isnt really called that, just wanted 
to show

a sample set of calculations the value can do!
 .groupByKey(Serdes.String(), Serdes.Double())
 .aggregate(/*initializer */, /* aggregator */, 
TimeWindows.of(60*1000,

60*1000), "store1m");

// i used an aggregate here so I could have a non-primitive 
value object
that does the calculations on each aggregator, pojo has an 
.add(Double) in

it.
KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed =
oneMinuteWindowed// I made my own Tuple2, will move window calc 
into it
 .groupBy( (windowedKey, value) -> new KeyValue<>(new 
Tuple2<String,
Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 
*1000*60*5),

value, keySerde, valSerde)

 // the above rounds time down to a timestamp divisible by 5 
minutes


 .reduce(/*your adder*/, /*your subtractor*/, "store5m");

 // where your subtractor can be as simple as (val, agg) -> 
agg - val

for primitive types or as complex as you need,

 // just make sure you get the order right (lesson hard 
learnt ;) ),

subtraction is not commutative!

 // again my val object has an .add(Obj) and a .sub() to handle
this, so nice!


KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> 
fifteenMinuteWindowed =

fiveMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new 
Tuple2(keyPair._1,

keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)

 // the above rounds time down to a timestamp divisible by 
15 minutes


 .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> 
sixtyMinuteWindowed =

fifteeenMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new 
Tuple2(keyPairair._1,

pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)

 // the above rounds time down to a timestamp divisible by 
60 minutes


 .reduce(/*your adder*/, /*your subtractor*/, "

Re: How to chain increasing window operations one after another

2017-05-09 Thread Michal Borowiecki
.
   Serdes gets nuts as well as the Generic typing on some of these classes
(yea you KeyValueMapper), makes for long code!  I had to specify them
everywhere since the key/val's changed.


I didn't get enough time to mess with it today, I will wrap up the unit
tests and run it to see how it performs against my real data as well
tomorrow.  I expect a huge reduction in resources (both streams and kafka
storage) by moving to this.
Thank you!



On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
wrote:


Michal,

that's an interesting idea. In an ideal world, Kafka Streams should have
an optimizer that is able to to this automatically under the hood. Too
bad we are not there yet.

@Garret: did you try this out?

This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.


-Matthias


On 5/8/17 1:43 AM, Michal Borowiecki wrote:

Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a
Windowed key (KTable<Windowed, Value>), all others would be just
KTable<Tuple2<Key, Long>, Value>.

Michał

On 07/05/17 16:09, Michal Borowiecki wrote:

Hi Garrett,

I've encountered a similar challenge in a project I'm working on (it's
still work in progress, so please take my suggestions with a grain of
salt).

Yes, I believe KTable.groupBy lets you accomplish what you are aiming
for with something like the following (same snippet attached as txt

file):


KTable<Windowed, Value> oneMinuteWindowed = yourStream//
where Key and Value stand for your actual key and value types

 .groupByKey()

 .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),

"store1m");

 //where your adder can be as simple as (val, agg) -> agg + val

 //for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
oneMinuteWindowed// Tuple2 for this example as defined by
javaslang library

 .groupBy( (windowedKey, value) -> new KeyValue<>(new
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
*1000*60*5), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store5m");

 // where your subtractor can be as simple as (val, agg) -> agg
- valfor primitive types or as complex as you need,

 // just make sure you get the order right (lesson hard learnt
;) ), subtraction is not commutative!


KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)

 // the above rounds time down to a timestamp divisible by 15
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
fifteeenMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store60m");


So, step by step:

   * You use a windowed aggregation only once, from there on you use
 the KTable abstraction only (which doesn't have windowed
 aggregations).
   * In each subsequent groupBy you map the key to a pair of
 (your-real-key, timestamp) where the timestamp is rounded down
 with the precision of the size of the new window.
   * reduce() on a KGroupedTable takes an adder and a subtractor and it
 will correctly update the new aggregate by first subtracting the
 previous value of the upstream record before adding the new value
 (this way, just as you said, the downstream is aware of the
 statefulness of the upstream and correctly treats each record as
 an update)
   * If you want to reduce message volume further, you can break these
 into separate KafkaStreams instances and configure downstream ones
 with a higher commit.interval.ms (unfortunately you can't have
 different values of this setting in different places of the same
 topology I'm afraid)
   * TODO: Look into retention policies, I haven't investigated that in
 any detail.

I haven't tested this exact code, so please excuse any typos.

Also, if someone with more experience could chip in and check if I'm
not talking nonsense here, or if there's an easier way to this, that
would be great.


I don't know if the alternative approach is possible, where you
convert each resulting KTable back into a stream and just do a
windowed aggregation somehow. That would feel more natural, but I
haven't figured out how to correctly window over a changelog in the
KStream abstraction, feels impossible in the high

Re: How to chain increasing window operations one after another

2017-05-09 Thread Michal Borowiecki
ts as well as the Generic typing on some of these classes
(yea you KeyValueMapper), makes for long code!  I had to specify them
everywhere since the key/val's changed.


I didn't get enough time to mess with it today, I will wrap up the unit
tests and run it to see how it performs against my real data as well
tomorrow.  I expect a huge reduction in resources (both streams and kafka
storage) by moving to this.
Thank you!



On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
wrote:


Michal,

that's an interesting idea. In an ideal world, Kafka Streams should have
an optimizer that is able to to this automatically under the hood. Too
bad we are not there yet.

@Garret: did you try this out?

This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.


-Matthias


On 5/8/17 1:43 AM, Michal Borowiecki wrote:

Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a
Windowed key (KTable<Windowed, Value>), all others would be just
KTable<Tuple2<Key, Long>, Value>.

Michał

On 07/05/17 16:09, Michal Borowiecki wrote:

Hi Garrett,

I've encountered a similar challenge in a project I'm working on (it's
still work in progress, so please take my suggestions with a grain of
salt).

Yes, I believe KTable.groupBy lets you accomplish what you are aiming
for with something like the following (same snippet attached as txt

file):


KTable<Windowed, Value> oneMinuteWindowed = yourStream//
where Key and Value stand for your actual key and value types

 .groupByKey()

 .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),

"store1m");

 //where your adder can be as simple as (val, agg) -> agg + val

 //for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
oneMinuteWindowed// Tuple2 for this example as defined by
javaslang library

 .groupBy( (windowedKey, value) -> new KeyValue<>(new
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
*1000*60*5), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store5m");

 // where your subtractor can be as simple as (val, agg) -> agg
- valfor primitive types or as complex as you need,

 // just make sure you get the order right (lesson hard learnt
;) ), subtraction is not commutative!


KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)

 // the above rounds time down to a timestamp divisible by 15
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
fifteeenMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store60m");


So, step by step:

   * You use a windowed aggregation only once, from there on you use
 the KTable abstraction only (which doesn't have windowed
 aggregations).
   * In each subsequent groupBy you map the key to a pair of
 (your-real-key, timestamp) where the timestamp is rounded down
 with the precision of the size of the new window.
   * reduce() on a KGroupedTable takes an adder and a subtractor and it
 will correctly update the new aggregate by first subtracting the
 previous value of the upstream record before adding the new value
 (this way, just as you said, the downstream is aware of the
 statefulness of the upstream and correctly treats each record as
 an update)
   * If you want to reduce message volume further, you can break these
 into separate KafkaStreams instances and configure downstream ones
 with a higher commit.interval.ms (unfortunately you can't have
 different values of this setting in different places of the same
 topology I'm afraid)
   * TODO: Look into retention policies, I haven't investigated that in
 any detail.

I haven't tested this exact code, so please excuse any typos.

Also, if someone with more experience could chip in and check if I'm
not talking nonsense here, or if there's an easier way to this, that
would be great.


I don't know if the alternative approach is possible, where you
convert each resulting KTable back into a stream and just do a
windowed aggregation somehow. That would feel more natural, but I
haven't figured out how to correctly window over a changelog in the
KStream abstraction, feels impossible in the high-level DSL.

H

Re: How to chain increasing window operations one after another

2017-05-09 Thread Michal Borowiecki

This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.

I was thinking the same thing :)

How about a page on the wiki listing useful patterns with subpages for 
each patten in detail? (like for KIPs)


Thanks,

Michał


On 08/05/17 22:26, Matthias J. Sax wrote:

Michal,

that's an interesting idea. In an ideal world, Kafka Streams should have
an optimizer that is able to to this automatically under the hood. Too
bad we are not there yet.

@Garret: did you try this out?

This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.


-Matthias


On 5/8/17 1:43 AM, Michal Borowiecki wrote:

Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a
Windowed key (KTable<Windowed, Value>), all others would be just
KTable<Tuple2<Key, Long>, Value>.

Michał

On 07/05/17 16:09, Michal Borowiecki wrote:

Hi Garrett,

I've encountered a similar challenge in a project I'm working on (it's
still work in progress, so please take my suggestions with a grain of
salt).

Yes, I believe KTable.groupBy lets you accomplish what you are aiming
for with something like the following (same snippet attached as txt file):


KTable<Windowed, Value> oneMinuteWindowed = yourStream//
where Key and Value stand for your actual key and value types

 .groupByKey()

 .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");

 //where your adder can be as simple as (val, agg) -> agg + val

 //for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
oneMinuteWindowed// Tuple2 for this example as defined by
javaslang library

 .groupBy( (windowedKey, value) -> new KeyValue<>(new
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
*1000*60*5), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store5m");

 // where your subtractor can be as simple as (val, agg) -> agg
- valfor primitive types or as complex as you need,

 // just make sure you get the order right (lesson hard learnt
;) ), subtraction is not commutative!


KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)

 // the above rounds time down to a timestamp divisible by 15
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
fifteeenMinuteWindowed

 .groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)

 // the above rounds time down to a timestamp divisible by 5
minutes

 .reduce(/*your adder*/, /*your subtractor*/, "store60m");


So, step by step:

   * You use a windowed aggregation only once, from there on you use
 the KTable abstraction only (which doesn't have windowed
 aggregations).
   * In each subsequent groupBy you map the key to a pair of
 (your-real-key, timestamp) where the timestamp is rounded down
 with the precision of the size of the new window.
   * reduce() on a KGroupedTable takes an adder and a subtractor and it
 will correctly update the new aggregate by first subtracting the
 previous value of the upstream record before adding the new value
 (this way, just as you said, the downstream is aware of the
 statefulness of the upstream and correctly treats each record as
 an update)
   * If you want to reduce message volume further, you can break these
 into separate KafkaStreams instances and configure downstream ones
 with a higher commit.interval.ms (unfortunately you can't have
 different values of this setting in different places of the same
 topology I'm afraid)
   * TODO: Look into retention policies, I haven't investigated that in
 any detail.

I haven't tested this exact code, so please excuse any typos.

Also, if someone with more experience could chip in and check if I'm
not talking nonsense here, or if there's an easier way to this, that
would be great.


I don't know if the alternative approach is possible, where you
convert each resulting KTable back into a stream and just do a
windowed aggregation somehow. That would feel more natural, but I
haven't figured out how to correctly window over a changelog in the
KStream abstraction, feels impossible in the high-level DSL.

Hope that helps,
Michal

On 02/05/17 18:03, Garrett Barton wrote:

Lets say I want to sum values over increasing window sizes of 1,5,15,60
minutes.  Right now I have the

Re: How to chain increasing window operations one after another

2017-05-08 Thread Michal Borowiecki

Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a 
Windowed key (KTable<Windowed, Value>), all others would be just 
KTable<Tuple2<Key, Long>, Value>.


Michał

On 07/05/17 16:09, Michal Borowiecki wrote:


Hi Garrett,

I've encountered a similar challenge in a project I'm working on (it's 
still work in progress, so please take my suggestions with a grain of 
salt).


Yes, I believe KTable.groupBy lets you accomplish what you are aiming 
for with something like the following (same snippet attached as txt file):



KTable<Windowed, Value> oneMinuteWindowed = yourStream// 
where Key and Value stand for your actual key and value types


.groupByKey()

.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");

//where your adder can be as simple as (val, agg) -> agg + val

//for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = 
oneMinuteWindowed// Tuple2 for this example as defined by 
javaslang library


.groupBy( (windowedKey, value) -> new KeyValue<>(new 
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 
*1000*60*5), value)


// the above rounds time down to a timestamp divisible by 5 
minutes


.reduce(/*your adder*/, /*your subtractor*/, "store5m");

// where your subtractor can be as simple as (val, agg) -> agg 
- valfor primitive types or as complex as you need,


// just make sure you get the order right (lesson hard learnt 
;) ), subtraction is not commutative!



KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = 
fiveMinuteWindowed


.groupBy( (keyPair, value) -> new KeyValue<>(new 
Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)


// the above rounds time down to a timestamp divisible by 15 
minutes


.reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = 
fifteeenMinuteWindowed


.groupBy( (keyPair, value) -> new KeyValue<>(new 
Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)


// the above rounds time down to a timestamp divisible by 5 
minutes


.reduce(/*your adder*/, /*your subtractor*/, "store60m");


So, step by step:

  * You use a windowed aggregation only once, from there on you use
the KTable abstraction only (which doesn't have windowed
aggregations).
  * In each subsequent groupBy you map the key to a pair of
(your-real-key, timestamp) where the timestamp is rounded down
with the precision of the size of the new window.
  * reduce() on a KGroupedTable takes an adder and a subtractor and it
will correctly update the new aggregate by first subtracting the
previous value of the upstream record before adding the new value
(this way, just as you said, the downstream is aware of the
statefulness of the upstream and correctly treats each record as
an update)
  * If you want to reduce message volume further, you can break these
into separate KafkaStreams instances and configure downstream ones
with a higher commit.interval.ms (unfortunately you can't have
different values of this setting in different places of the same
topology I'm afraid)
  * TODO: Look into retention policies, I haven't investigated that in
any detail.

I haven't tested this exact code, so please excuse any typos.

Also, if someone with more experience could chip in and check if I'm 
not talking nonsense here, or if there's an easier way to this, that 
would be great.



I don't know if the alternative approach is possible, where you 
convert each resulting KTable back into a stream and just do a 
windowed aggregation somehow. That would feel more natural, but I 
haven't figured out how to correctly window over a changelog in the 
KStream abstraction, feels impossible in the high-level DSL.


Hope that helps,
Michal

On 02/05/17 18:03, Garrett Barton wrote:

Lets say I want to sum values over increasing window sizes of 1,5,15,60
minutes.  Right now I have them running in parallel, meaning if I am
producing 1k/sec records I am consuming 4k/sec to feed each calculation.
In reality I am calculating far more than sum, and in this pattern I'm
looking at something like (producing rate)*(calculations)*(windows) for a
consumption rate.

  So I had the idea, could I feed the 1 minute window into the 5 minute, and
5 into 15, and 15 into 60.  Theoretically I would consume a fraction of the
records, not have to scale as huge and be back to something like (producing
rate)*(calculations)+(updates).

   Thinking this is an awesome idea I went to try and implement it and got
twisted around.  These are windowed grouping operations that produce
KTables, which means instead of a raw stre

Re: How to chain increasing window operations one after another

2017-05-07 Thread Michal Borowiecki
he latest values for say the 5
1 minute sum's in a given window, to perform the 5 minute sum.  Reading the
docs which are awesome, I cannot determine if the KTable.groupby() would
work over a window, and would reduce or aggregate thus do what I need?

Any ideas?



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


KTable<Windowed, Value> oneMinuteWindowed = yourStream// where Key and 
Value stand for your actual key and value types

.groupByKey()

.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");

//where your adder can be as simple as (val, agg) -> agg + val

//for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = 
oneMinuteWindowed// Tuple2 for this example as defined by javaslang library

.groupBy( (windowedKey, value) -> new KeyValue<>(new 
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), 
value)

// the above rounds time down to a timestamp divisible by 5 minutes

.reduce(/*your adder*/, /*your subtractor*/, "store5m");

// where your subtractor can be as simple as (val, agg) -> agg - val 
for primitive types or as complex as you need,

// just make sure you get the order right (lesson hard learnt ;) ), 
subtraction is not commutative!


KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = 
fiveMinuteWindowed

.groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, 
keyPair._2 /1000/60/15 *1000*60*15), value)

// the above rounds time down to a timestamp divisible by 15 minutes

.reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = 
fifteeenMinuteWindowed

.groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, 
pair._2 /1000/60/60 *1000*60*60), value)

// the above rounds time down to a timestamp divisible by 5 minutes

.reduce(/*your adder*/, /*your subtractor*/, "store60m");


Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Michal Borowiecki

Yes, that's what the docs say in both places:

max.in.flight.requests.per.connection 	The maximum number of 
unacknowledged requests the client will send on a single connection 
before blocking. Note that if this setting is set to be greater than 1 
and there are failed sends, there is a risk of message re-ordering due 
to retries (i.e., if retries are enabled).




retries 	Setting a value greater than zero will cause the client to 
resend any record whose send fails with a potentially transient error. 
Note that this retry is no different than if the client resent the 
record upon receiving the error. Allowing retries without 
setting|max.in.flight.requests.per.connection|to 1 will potentially 
change the ordering of records because if two batches are sent to a 
single partition, and the first fails and is retried but the second 
succeeds, then the records in the second batch may appear first.



Cheers,

Michał


On 30/04/17 19:32, Jun MA wrote:

Does this mean that if the client have retry > 0 and 
max.in.flight.requests.per.connection > 1, then even if the topic only have one 
partition, there’s still no guarantee of the ordering?

Thanks,
Jun

On Apr 30, 2017, at 7:57 AM, Hans Jespersen <h...@confluent.io> wrote:

There is a parameter that controls this behavior called max.in. 
flight.requests.per.connection

If you set max.in. flight.requests.per.connection = 1 then the producer waits 
until previous produce requests returns a response before sending the next one 
(or retrying). The retries parameter controller the number of times to attempt 
to produce a batch after a failure.

If flight.requests.per.connection = 1 and retries is get to the maximum then 
ordering is guaranteed.

If there is a timeout then the producer library would try again and again to 
produce the message and will not skip over to try and produce the next message.

If you set flight.requests.per.connection > 1 (I think the default is 5) then 
you can get a commit log with messages out of order wrt the original published 
order (because retries are done in parallel rather then in series)

-hans




On Apr 30, 2017, at 3:13 AM, Petr Novak <oss.mli...@gmail.com> wrote:

Hello,

Does Kafka producer waits till previous batch returns response before
sending next one? Do I assume correctly that it does not when retries can
change ordering?



Hence batches delay is introduced only by producer internal send loop time
and linger?



If a timeout would be localized only to a single batch send request for some
reason, does it affect the next batch (assuming this batch can go through
successfully)?



Many thanks,

Petr



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Michal Borowiecki

Apologies, I must have not made myself clear.

I meant the values in the records coming from the input topic (which in 
turn are coming from kafka connect in the example at hand)


and not the records coming out of the join.

My intention was to warn against sending null values from kafka connect 
to the topic that is then meant to be read-in as a ktable to filter against.



Am I clearer now?


Cheers,

Michał


On 30/04/17 18:14, Matthias J. Sax wrote:

Your observation is correct.

If  you use inner KStream-KTable join, the join will implement the
filter automatically as the join will not return any result.


-Matthias



On 4/30/17 7:23 AM, Michal Borowiecki wrote:

I have something working on the same principle (except not using
connect), that is, I put ids to filter on into a ktable and then (inner)
join a kstream with that ktable.

I don't believe the value can be null though. In a changlog null value
is interpreted as a delete so won't be put into a ktable.

The RocksDB store, for one, does this:

private void putInternal(byte[] rawKey, byte[] rawValue) {
 if (rawValue == null) {
 try {
 db.delete(wOptions, rawKey);

But any non-null value would do.
Please correct me if miss-understood.

Cheers,
Michał

On 27/04/17 22:44, Matthias J. Sax wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Lookups to the DB would be hard to get done anyway. Ie, it would not
perform well, as all your calls would need to be synchronous...



Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?

That is what I did propose (maybe it was not clear). If you use Connect,
you can just import the ID into Kafka and leave the value empty (ie,
null). This reduced you cache data to a minimum. And the KStream-KTable
join work as you described it :)


-Matthias

On 4/27/17 2:37 PM, Ali Akhtar wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?


On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax <matth...@confluent.io>
wrote:


The recommended solution would be to use Kafka Connect to load you DB
data into a Kafka topic.

With Kafka Streams you read your db-topic as KTable and do a (inne)
KStream-KTable join to lookup the IDs.


-Matthias

On 4/27/17 2:22 PM, Ali Akhtar wrote:

I have a Kafka topic which will receive a large amount of data.

This data has an 'id' field. I need to look up the id in an external db,
see if we are tracking that id, and if yes, we process that message, if
not, we ignore it.

99% of the data will be for ids which are not being tracked - 1% or so

will

be for ids which are tracked.

My concern is, that there'd be a lot of round trips to the db made just

to

check the id, and if it'd be better to cache the ids being tracked
somewhere, so other ids are ignored.

I was considering sending a message to another (or the same topic)

whenever

a new id is added to the track list, and that id should then get

processed

on the node which will process the messages.

Should I just cache all ids on all nodes (which may be a large amount),

or

is there a way to only cache the id on the same kafka streams node which
will receive data for that id?


--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.open

Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki

Ah, yes, you're right. I miss-read it.

My bad. Apologies.

Michal

On 30/04/17 16:02, Svante Karlsson wrote:

@michal

My interpretation is that he's running 2 instances of zookeeper - not 
6. (1 on the "4 broker machine" and one on the other)


I'm not sure where that leaves you in zookeeper land - ie if you 
happen to have a timeout between the two zookeepers will you be out of 
service or will you have a split brain problem? None of the 
alternatives are good. That said - it should be visible in the logs.


Anyway two zk is not a good config - stick to one or go to three.





2017-04-30 15:41 GMT+02:00 Michal Borowiecki 
<michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>>:


Hi Jan,

Correct. As I said before it's not common or recommended practice
to run an even number, and I wouldn't recommend it myself. I hope
it didn't sound as if I did.

However, I don't see how this would cause the issue at hand unless
at least 3 out of the 6 zookeepers died, but that could also have
happened in a 5 node setup.

In either case, changing the number of zookeepers is not a
prerequisite to progress debugging this issue further.

Cheers,

Michal


On 30/04/17 13:35, jan wrote:

I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"

<https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>
<https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>

    cheers

    jan


On 30/04/2017, Michal Borowiecki<michal.borowie...@openbet.com>
<mailto:michal.borowie...@openbet.com>  wrote:

Svante, I don't share your opinion.
Having an even number of zookeepers is not a problem in itself, it
simply means you don't get any better resilience than if you had one
fewer instance.
Yes, it's not common or recommended practice, but you are allowed to
have an even number of zookeepers and it's most likely not related to
the problem at hand and does NOT need to be addressed first.

https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup

<https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup>

Abhit, I'm afraid the log snippet is not enough for me to help.
Maybe someone else in the community with more experience can recognize
the symptoms but in the meantime, if you haven't already done so, you
may want to search for similar issues:


https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22

<https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20%7E%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22>

searching for text like "ZK expired; shut down all controller" or "No
broker in ISR is alive for" or other interesting events form the log.

Hope that helps,
Michal


On 26/04/17 21:40, Svante Karlsson wrote:

You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra"<abhit...@gmail.com> 
<mailto:abhit...@gmail.com>  wrote:


Any pointers please


Abhi

On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra<abhit...@gmail.com> 
<mailto:abhit...@gmail.com>
wrote:


Hi *

My kafka setup


**OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
Machine*

**ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
nodes machine)*
** 2 Topics with partition size = 50 and replication factor = 3*

I am producing on an average of around 500 messages / sec with each
message size close to 98 bytes...

More or less the message rate stays constant throughout, but after

running

the setup for close to 2 weeks , my Kafka cluster broke and this
happened
twice in a month.  Not able to understand what's the issue, Kafka gurus
please do share your inputs...

the controlle.log file at the time of Kafka broken looks like




*[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26

12:03:34,998]

INFO [Controller 0]: Removed ArrayBuffer() from list of shut

Re: Controller connection failures

2017-04-30 Thread Michal Borowiecki

Hi Chuck,

Are you running zookeepers in the same containers as Kafka brokers?

Kafka brokers should be able to communicate with any of the zookeepers 
and, more importantly, zookeepers need to be able to talk to each-other.


Therefore, the zookeeper port should be exposed too (2181 by default), 
otherwise you're not going to have a valid cluster.


Docs on the controller are here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals

Let me know if that helped.

Cheers,
Michał

On 27/04/17 19:08, Chuck Musser wrote:

I'm running into a problem with a 3 broker cluster where,
intermittently, one of the broker's controller begins to report that
it cannot connect to the other brokers and repeatedly logs the
failure.

Each broker is running in its own Docker container on separate
machines.  These Docker containers have exposed 9092, which I think is
sufficient for operation, but not sure.

The log message are these:
[2017-04-27 17:16:28,985] WARN [Controller-3-to-broker-2-send-thread], 
Controller 3's connection to broker 64174aa85d04:9092 (id: 2 rack: null) was 
unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to 64174aa85d04:9092 (id: 2 rack: null) failed
at 
kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-04-27 17:16:28,986] WARN [Controller-3-to-broker-1-send-thread], 
Controller 3's connection to broker d4b8943ad4b5:9092 (id: 1 rack: null) was 
unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to d4b8943ad4b5:9092 (id: 1 rack: null) failed
at 
kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

This is Kafka 2.12-0.10.2.0. I'm wondering:

1. How do we figure out the cause of the connect failures?
2. What's the controller anyway?
3. Are there some command-line diagnostic tools for inspecting the health of 
the system?

Thanks for any help,
Chuck



--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Michal Borowiecki
I have something working on the same principle (except not using 
connect), that is, I put ids to filter on into a ktable and then (inner) 
join a kstream with that ktable.


I don't believe the value can be null though. In a changlog null value 
is interpreted as a delete so won't be put into a ktable.


The RocksDB store, for one, does this:

private void putInternal(byte[] rawKey,byte[] rawValue) {
if (rawValue ==null) {
try {
db.delete(wOptions, rawKey);

But any non-null value would do.
Please correct me if miss-understood.

Cheers,
Michał

On 27/04/17 22:44, Matthias J. Sax wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Lookups to the DB would be hard to get done anyway. Ie, it would not
perform well, as all your calls would need to be synchronous...



Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?

That is what I did propose (maybe it was not clear). If you use Connect,
you can just import the ID into Kafka and leave the value empty (ie,
null). This reduced you cache data to a minimum. And the KStream-KTable
join work as you described it :)


-Matthias

On 4/27/17 2:37 PM, Ali Akhtar wrote:

I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?


On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax <matth...@confluent.io>
wrote:


The recommended solution would be to use Kafka Connect to load you DB
data into a Kafka topic.

With Kafka Streams you read your db-topic as KTable and do a (inne)
KStream-KTable join to lookup the IDs.


-Matthias

On 4/27/17 2:22 PM, Ali Akhtar wrote:

I have a Kafka topic which will receive a large amount of data.

This data has an 'id' field. I need to look up the id in an external db,
see if we are tracking that id, and if yes, we process that message, if
not, we ignore it.

99% of the data will be for ids which are not being tracked - 1% or so

will

be for ids which are tracked.

My concern is, that there'd be a lot of round trips to the db made just

to

check the id, and if it'd be better to cache the ids being tracked
somewhere, so other ids are ignored.

I was considering sending a message to another (or the same topic)

whenever

a new id is added to the track list, and that id should then get

processed

on the node which will process the messages.

Should I just cache all ids on all nodes (which may be a large amount),

or

is there a way to only cache the id on the same kafka streams node which
will receive data for that id?





--
Signature
<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki

Hi Jan,

Correct. As I said before it's not common or recommended practice to run 
an even number, and I wouldn't recommend it myself. I hope it didn't 
sound as if I did.


However, I don't see how this would cause the issue at hand unless at 
least 3 out of the 6 zookeepers died, but that could also have happened 
in a 5 node setup.


In either case, changing the number of zookeepers is not a prerequisite 
to progress debugging this issue further.


Cheers,

Michal


On 30/04/17 13:35, jan wrote:

I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"

<https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>

cheers

jan


On 30/04/2017, Michal Borowiecki <michal.borowie...@openbet.com> wrote:

Svante, I don't share your opinion.
Having an even number of zookeepers is not a problem in itself, it
simply means you don't get any better resilience than if you had one
fewer instance.
Yes, it's not common or recommended practice, but you are allowed to
have an even number of zookeepers and it's most likely not related to
the problem at hand and does NOT need to be addressed first.
https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup

Abhit, I'm afraid the log snippet is not enough for me to help.
Maybe someone else in the community with more experience can recognize
the symptoms but in the meantime, if you haven't already done so, you
may want to search for similar issues:

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22

searching for text like "ZK expired; shut down all controller" or "No
broker in ISR is alive for" or other interesting events form the log.

Hope that helps,
Michal


On 26/04/17 21:40, Svante Karlsson wrote:

You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra" <abhit...@gmail.com> wrote:


Any pointers please


Abhi

On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra <abhit...@gmail.com>
wrote:


Hi *

My kafka setup


**OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
Machine*

**ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
nodes machine)*
** 2 Topics with partition size = 50 and replication factor = 3*

I am producing on an average of around 500 messages / sec with each
message size close to 98 bytes...

More or less the message rate stays constant throughout, but after

running

the setup for close to 2 weeks , my Kafka cluster broke and this
happened
twice in a month.  Not able to understand what's the issue, Kafka gurus
please do share your inputs...

the controlle.log file at the time of Kafka broken looks like




*[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26

12:03:34,998]

INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]

INFO

[Partition state machine on Controller 0]: Invoking state change to
OfflinePartition for partitions
[__consumer_offsets,19],[mytopic,11],[__consumer_

offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
[__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
offsets,0],[mytopic,32],[__consumer_offsets,24],[
mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
mytopic,35],[__consumer_offsets,20],[mytopic,1],[
mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[
mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_
_consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27]
,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD,
31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_
_consumer_offsets,48],[__consumer_offsets,11],[mytopic,
18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[
mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37]
,[__consumer_offsets,6],[__consumer_offsets,

Re: Issue in Kafka running for few days

2017-04-30 Thread Michal Borowiecki
ker id 1
(kafka.controller.KafkaController)[2017-04-26 12:03:35,045] DEBUG
[Controller 1]: De-registering IsrChangeNotificationListener
(kafka.controller.KafkaController)[2017-04-26 12:03:35,060] INFO

[Partition

state machine on Controller 1]: Stopped partition state machine
(kafka.controller.PartitionStateMachine)[2017-04-26 12:03:35,060] INFO
[Replica state machine on controller 1]: Stopped replica state machine
(kafka.controller.ReplicaStateMachine)[2017-04-26 12:03:35,060] INFO
[Controller 1]: Broker 1 resigned as the controller
(kafka.controller.KafkaController)[2017-04-26 12:03:36,013] DEBUG
[OfflinePartitionLeaderSelector]: No broker in ISR is alive for
[__consumer_offsets,19]. Pick the leader from the alive assigned

replicas:

(kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26

12:03:36,029]

DEBUG [OfflinePartitionLeaderSelector]:
[mytopic,11]. Pick the leader from the alive assigned replicas:
(kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26

12:03:36,029]

DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for
[__consumer_offsets,30]. Pick the leader from the alive assigned

replicas:

(kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26

12:03:37,811]

DEBUG [OfflinePartitionLeaderSelector]: Some broker in ISR is alive for
[mytopicOLD,18]. Select 2 from ISR 2 to be the leader.
(kafka.controller.OfflinePartitionLeaderSelector)*

Typical broker config attached.. Please do share some valid inputs...

Abhi
!wq


*-- *
If you can't succeed, call it version 1.0




--
If you can't succeed, call it version 1.0



--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: topics stuck in "Leader: -1" after crash while migrating topics

2017-04-28 Thread Michal Borowiecki

Hi James,

This "Cached zkVersion [x] not equal to that in zookeeper" issue bit us 
once in production and I found these ticket to be relevant:

KAFKA-2729 <https://issues.apache.org/jira/browse/KAFKA-2729>
KAFKA-3042 <https://issues.apache.org/jira/browse/KAFKA-3042>
KAFKA-3083 <https://issues.apache.org/jira/browse/KAFKA-3083>

Unfortunately, I don't believe there is a fix for it yet, or in the making.

Thanks,
Michał

On 28/04/17 19:26, James Brown wrote:

For what it's worth, shutting down the entire cluster and then restarting
it did address this issue.

I'd love anyone's thoughts on what the "correct" fix would be here.

On Fri, Apr 28, 2017 at 10:58 AM, James Brown <jbr...@easypost.com> wrote:


The following is also appearing in the logs a lot, if anyone has any ideas:

INFO Partition [easypost.syslog,7] on broker 1: Cached zkVersion [647] not
equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

On Fri, Apr 28, 2017 at 10:43 AM, James Brown <jbr...@easypost.com> wrote:


We're running 0.10.1.0 on a five-node cluster.

I was in the process of migrating some topics from having 2 replicas to
having three replicas when two the five machines in this cluster crashed
(brokers 2 and 3).

After restarting them, all of the topics that were previously assigned to
them are unavailable and showing "Leader: -1".

Example kafka-topics output:

% kafka-topics.sh --zookeeper $ZK_HP --describe  --unavailable-partitions
Topic: __consumer_offsets Partition: 9 Leader: -1 Replicas: 3,2,4 Isr:
Topic: __consumer_offsets Partition: 13 Leader: -1 Replicas: 3,2,4 Isr:
Topic: __consumer_offsets Partition: 17 Leader: -1 Replicas: 3,2,5 Isr:
Topic: __consumer_offsets Partition: 23 Leader: -1 Replicas: 5,2,1 Isr:
Topic: __consumer_offsets Partition: 25 Leader: -1 Replicas: 3,2,5 Isr:
Topic: __consumer_offsets Partition: 26 Leader: -1 Replicas: 3,2,1 Isr:
Topic: __consumer_offsets Partition: 30 Leader: -1 Replicas: 3,1,2 Isr:
Topic: __consumer_offsets Partition: 33 Leader: -1 Replicas: 1,2,4 Isr:
Topic: __consumer_offsets Partition: 35 Leader: -1 Replicas: 1,2,5 Isr:
Topic: __consumer_offsets Partition: 39 Leader: -1 Replicas: 3,1,2 Isr:
Topic: __consumer_offsets Partition: 40 Leader: -1 Replicas: 3,4,2 Isr:
Topic: __consumer_offsets Partition: 44 Leader: -1 Replicas: 3,1,2 Isr:
Topic: __consumer_offsets Partition: 45 Leader: -1 Replicas: 1,3,2 Isr:

​Note that I wasn't even moving any of the __consumer_offsets partitions,
so I'm not sure if the fact that a reassignment was in progress is a red
herring or not.

The logs are full of

ERROR [ReplicaFetcherThread-0-3], Error for partition [tracking.syslog,2]
to broker 3:org.apache.kafka.common.errors.UnknownServerException: The
server experienced an unexpected error when processing the request
(kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-3], Error for partition [tracking.syslog,2]
to broker 3:org.apache.kafka.common.errors.UnknownServerException: The
server experienced an unexpected error when processing the request
(kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-3], Error for partition
[epostg.request_log_v1,0] to broker 
3:org.apache.kafka.common.errors.UnknownServerException:
The server experienced an unexpected error when processing the request
(kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-3], Error for partition
[epostg.request_log_v1,0] to broker 
3:org.apache.kafka.common.errors.UnknownServerException:
The server experienced an unexpected error when processing the request
(kafka.server.ReplicaFetcherThread)​

​What can I do to fix this? Should I manually reassign all partitions
that were led by brokers 2 or 3 to only have whatever the third broker was
in their replica-set as their replica set? Do I need to temporarily enable
unclean elections?

I've never seen a cluster fail this way...​

--
James Brown
Engineer




--
James Brown
Engineer






--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 

Re: [VOTE] 0.10.2.1 RC3

2017-04-22 Thread Michal Borowiecki

It's listed below:


* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/



On 22/04/17 19:23, Shimi Kiviti wrote:

Is there a maven repo with these jars so I can test it against our kafka
streams services?

On Sat, Apr 22, 2017 at 9:05 PM, Eno Thereska <eno.there...@gmail.com>
wrote:


+1 tested the usual streams tests as before.

Thanks
Eno

On 21 Apr 2017, at 17:56, Gwen Shapira <g...@confluent.io> wrote:

Hello Kafka users, developers, friends, romans, countrypersons,

This is the fourth (!) candidate for release of Apache Kafka 0.10.2.1.

It is a bug fix release, so we have lots of bug fixes, some super
important.

Release notes for the 0.10.2.1 release:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/RELEASE_NOTES.html

*** Please download, test and vote by Wednesday, April 26, 2017 ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc3/javadoc/

* Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=

8e4f09caeaa877f06dc75c7da1af7a727e5e599f


* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

/**

Your help in validating this bugfix release is super valuable, so
please take the time to test and vote!

Suggested tests:
* Grab the source archive and make sure it compiles
* Grab one of the binary distros and run the quickstarts against them
* Extract and verify one of the site docs jars
* Build a sample against jars in the staging repo
* Validate GPG signatures on at least one file
* Validate the javadocs look ok
* The 0.10.2 documentation was updated for this bugfix release
(especially upgrade, streams and connect portions) - please make sure
it looks ok: http://kafka.apache.org/documentation.html

But above all, try to avoid finding new bugs - we want to get this

release

out the door already :P


Thanks,
Gwen



--
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter <https://twitter.com/ConfluentInc> | blog
<http://www.confluent.io/blog>




--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612




Re: [VOTE] 0.10.2.1 RC0

2017-04-12 Thread Michal Borowiecki
FWIW, I upgraded without issue and noticed the speedup from 
KAFKA-4851/KAFKA-4876.


+1 from me (non-binding)


On 12/04/17 02:06, Gwen Shapira wrote:

Wrong link :)
http://kafka.apache.org/documentation/#upgrade
and
http://kafka.apache.org/documentation/streams#streams_api_changes_0102

On Tue, Apr 11, 2017 at 5:57 PM, Gwen Shapira <g...@confluent.io> wrote:

FYI: I just updated the upgrade notes with Streams changes:
http://kafka.apache.org/documentation/#gettingStarted

On Fri, Apr 7, 2017 at 5:12 PM, Gwen Shapira <g...@confluent.io> wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for the release of Apache Kafka 0.10.2.1. This
is a bug fix release and it includes fixes and improvements from 24 JIRAs
(including a few critical bugs). See the release notes for more details:

http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Thursday, 13 April, 8am PT ***

Your help in validating this bugfix release is super valuable, so
please take the time to test and vote!

Few notes:
1. There are missing "Notable Changes" in the docs:
https://github.com/apache/kafka/pull/2824
I will review, merge and update the docs by Monday.
2. The last commit (KAFKA-4943 chery-pick) did not pass system tests
yet. We may need another RC if system tests fail tonight.

Suggested tests:
  * Grab the source archive and make sure it compiles
  * Grab one of the binary distros and run the quickstarts against them
  * Extract and verify one of the site docs jars
  * Build a sample against jars in the staging repo
  * Validate GPG signatures on at least one file
  * Validate the javadocs look ok

*

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging

* Javadoc:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/javadoc/

* Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=d08115f05da0e39c7f75b45e05d6d14ad5baf71d

* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

Thanks,
Gwen Shapira



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog





--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com <http://www.openbet.com/>


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612