Re: KTable aggregations send intermediate results downstream?

2016-08-24 Thread Mathieu Fenniak
Hi Guozhang,

I've been working around this issue by dropping down to the Processor
API, but, I was hoping you might be able to point out if there is a
flaw is in this proposed change:


https://github.com/apache/kafka/compare/trunk...mfenniak:suppress-duplicate-repartition-output

This adjusts KTableRepartitionMap so that if there's no change in the
group-by key, the repartition processor just forwards the changed
value onwards.  (This breaks a couple of tests that anticipate the
exact existing output, so don't consider this a complete patch...)

Mathieu


On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang  wrote:
> Hi Mathieu,
>
> If you are only interested in the aggregate result "snapshot" but not its
> change stream (note that KTable itself is not actually a "table" as in
> RDBMS, but still a stream), you can try to use the queryable state feature
> that is available in trunk, which will be available in 0.10.1.0 release.
>
> In sum, it allows you to query any states "snapshot" which is used in
> aggregation operators in real time with state store provided APIs such as
> get-by-key, range queries on windows, etc. Details can be found in thie KIP
> (we are working on more docs / blog posts at the time):
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>
> Guozhang
>
>
> On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Hm... I hadn't thought of the repartitioning involvement.
>>
>> I'm not confident I'm understanding completely, but I believe you're
>> saying the decision to process data in this way is made before the
>> data being processed is available, because the partition *may* change,
>> because the groupBy key *may* change.
>>
>> I'm still feeling that I'm stuck getting corrupted output in the
>> middle of an aggregation.
>>
>> It's especially problematic for me if the updates to the source KTable
>> don't actually affect the results of the aggregation.  In the
>> word-count example in my original e-mail, this might be similar to
>> editing an unrelated field "author" in any article; doesn't actually
>> affect the groupBy, doesn't affect the aggregation, but still results
>> in the wrong output occurring temporarily.  (and inefficient
>> processing)
>>
>> Are there any tools in Kafka Streams that might help me prevent
>> downstream calculations if the relevant inputs haven't changed?  I was
>> thinking I'd be able to use mapValues to pluck only relevant fields
>> out of a KTable, materialize a new KTable (.through) from that, and
>> then there'd be some state from which KS would be able to only invoke
>> downstream nodes if data has changed... but it doesn't seem to work
>> like that.
>>
>> Thanks so much for your responses Guozhang, I really appreciate your
>> time to help me out.
>>
>> Mathieu
>>
>>
>> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang  wrote:
>> > The problem is that Kafka Streams need to repartition the streams based
>> on
>> > the groupBy keys when doing aggregations. For your case, the original
>> > stream may be read from a topic that is partitioned on "K", and you need
>> to
>> > first repartition on "category" on an intermediate topic before the
>> > aggregation can be executed.
>> >
>> > Hence the old and new value may be sent to two different partitions of
>> the
>> > intermediate topic, and hence be processed by two different process (it
>> > won't be the case in your application, since you mentioned the "category"
>> > will never change). Since the library cannot tell if the groupBy key will
>> > never change, it has to be conservative and do this subtract / add
>> process
>> > while receiving the old / new value.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
>> > mathieu.fenn...@replicon.com> wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for responding.  Ah, I see what you're saying... in the case of
>> >> an update to the KTable, the aggregator's subtractor result would be
>> >> necessary if the group-by key changes in the update.
>> >>
>> >> It makes sense, but unfortunately the behavior leaves me feeling a
>> >> little sketchy... when the group-by key doesn't change (which is
>> >> guaranteed in my case), I'm outputting results that don't correspond
>> >> at all to the inputs, temporarily.  It's immediately followed by a
>> >> corrected result.
>> >>
>> >> Would it be a feasible optimization to not send the subtractor's
>> >> result out of the aggregate, only in the case where the groupBy key
>> >> does not change between the old record and the new record?
>> >>
>> >> Mathieu
>> >>
>> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang 
>> wrote:
>> >> > Hello Mathieu,
>> >> >
>> >> > Note that semantics of KTable aggregations (i.e.
>> >> "KTable.groupBy.aggregate"
>> >> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey"
>> as
>> 

Multiple Consumers From Same Group Assigned Same Partition

2016-08-24 Thread Nicholas Harezga
I am currently attempting to upgrade my software to use Kafka 0.9 from 0.8.2. I 
am trying to switch over to the new Consumer API to allow for rebalancing as 
machines are added or removed from our cluster. I am running into an issue 
where the same partition on a topic is being assigned to multiple consumers for 
a short period of time when a machine is added to the group. This results in 
some of the messages being processed more than once, while I am aiming for 
exactly once. I followed the setup instructions in the Javadocs and use an 
external data store for saving the offsets while consuming and when rebalancing.

In my test cluster I start initially with 2 machines consuming and a single 
producer. Everything works fine at the start and each consumer gets half of the 
partitions. When I add a third machine it is assigned a portion of the 
partitions but these partitions aren't revoked from one of the two initial 
machines. Below are some log statements from my program, hopefully they help 
illustrate my situation.

Partition 14 is initially assigned to machine 1. Machine 1 reads a number of 
messages before machine 3 is added. Partition 14 is assigned to machine 3 when 
started, but partition 14 was not revoked from machine 1. Both machines then 
read the same message at offset 3 before the system rebalances and both have 
access to partition 14 revoked. Machine 2 is then assigned partition 14 after 
it is revoked from machine 1 but is still assigned to machine 3. After it is 
revoked from machine 3, machine 2 is the only one with access to partition 14.

Machine 1 (Turned on at start)
2016-08-24 14:17:08 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 0
2016-08-24 14:18:48 DEBUG KafkaStreamReader:312 - Committing topic assignments 
partition 14 offset 3
2016-08-24 14:19:38 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 
(Message read from kafka)
2016-08-24 14:19:38 DEBUG KafkaStreamReader:312 - Committing topic assignments 
partition 14 offset 4
2016-08-24 14:19:39 DEBUG KafkaStreamReader:338 - REVOKED: Committing for 
partition 14 of topic assignments offset 4

Machine 2 (Turned on at start)
2016-08-24 14:19:51 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 4

Machine 3 (Turned on a few minutes later)
2016-08-24 14:19:21 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 3
2016-08-24 14:19:48 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 
(Message read from kafka - already read by machine 1)
2016-08-24 14:20:00 DEBUG KafkaStreamReader:338 - REVOKED: Committing for 
partition 14 of topic assignments offset 4

My cluster is running Cloudera 5.7.0 with Kafka version 2.0.1-1.2.0.1.p0.5 
which corresponds to Kafka version 0.9.0.0+kafka2.0.1+283. 
(https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html)

Can anyone help explain what I'm doing wrong here? If there is any further 
information I can provide to help this along please let me know and I will be 
happy to provide it if I can.


Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak

Hey Jun,

I go and try again :), wrote the first one in quite a stressful 
environment. The bottom line is that I, for our use cases, see a to 
small use/effort ratio in this time index.
We do not bootstrap new consumers for key-less logs so frequently and 
when we do it, they usually want everything (prod deployment) or just 
start at the end ( during development).
That caused quite some frustration. Would be better if I could just have 
turned it off and don't bother any more. Anyhow in the meantime I had to 
dig deeper into the inner workings
and the impacts are not as dramatic as I initially assumed. But it still 
carries along some oddities I want to list here.


first odd thing:
Quote
---
Enforce time based log rolling

Currently time based log rolling is based on the creating time of the 
log segment. With this KIP, the time based rolling would be changed to 
based on the largest timestamp ever seen in a log segment. A new log 
segment will be rolled out if current time is greater than largest 
timestamp ever seen in the log segment + log.roll.ms. When 
message.timestamp.type=CreateTime, user should set 
max.message.time.difference.ms appropriately together with log.roll.ms 
to avoid frequent log segment roll out.


---
imagine a Mirrormaker falls behind and the Mirrormaker has a delay of 
some time > log.roll.ms.
From my understanding, when noone else is producing to this partition 
except the mirror maker, the broker will start rolling on every append?
Just because you maybe under DOS-attack and your application only works 
in the remote location. (also a good occasion for MM to fall behind)
But checking the default values indicates that it should indeed not 
become a problem as log.roll.ms defaults to ~>7 days.



second odd thing:
Quote
---
A time index entry (/T/, /offset/) means that in this segment any 
message whose timestamp is greater than /T/ come after /offset./


The OffsetRequest behaves almost the same as before. If timestamp *T* is 
set in the OffsetRequest, the first offset in the returned offset 
sequence means that if user want to consume from *T*, that is the offset 
to start with. The guarantee is that any message whose timestamp is 
greater than T has a bigger offset. i.e. Any message before this offset 
has a timestamp < *T*.

---

Given how the index is maintained, with a little bit of bad luck 
(rolling upgrade/config change of mirrormakers for different 
colocations) one ends with segmentN.timeindex.maxtimestamp > 
segmentN+1.timeindex.maxtimestamp. If I do not overlook something here, 
then the fetch code does not seem to take that into account.
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604 

In this case the Goal listed number 1, not loose any messages, is not 
achieved. easy fix seems to be to sort the segsArray by maxtimestamp but 
can't wrap my head around it just now.



third odd thing:
Regarding the worry of increasing complexity. Looking at the code
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 
-196
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 
& 230
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 
-266
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 
-307
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 
- 410
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 
- 435
https://github.com/apache/kafka/blob/05d00b5aca2e1e59ad685a3f051d2ab022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 
-108

and especially
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717
it feels like the Log & Log segment having a detailed knowledge about 
the maintained indexes is not the ideal way to model the problem.
Having the Server maintian a Set of Indexes could reduce the code 
complexity, while also allowing an easy switch to turn it off. I think 
both indexes could point to the physical position, a client would do 
fetch(timestamp), and the continue with the offsets as usual. Is there 
any specific reason the timestamp index points into the offset index?
For reading one would need to branch earlier, maybe already in 
ApiHandler and decide what indexes to query, but this branching logic is 
there now anyhow.


Further I also can't think of a situation where one wants to have this 
log.message.timestamp.difference.max.ms take effect. I think this 
defeats goal 1 again.


ITE having this index in the brokers now feels wired to me. Gives me a 

Re: Questions about Apache Kafka

2016-08-24 Thread Marko Bonaći
Hi Karin,
regarding 5 (fsyncing to disk), take a look at the broker configuration
parameters whose names start with log.flush.

http://kafka.apache.org/documentation.html#brokerconfigs

Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Wed, Aug 24, 2016 at 1:29 PM, Herwerth Karin (CI/OSW3) <
karin.herwe...@de.bosch.com> wrote:

> Dear Sir or Madam,
>
> I'm a beginner in Apache Kafka and have questions which I don't get it
> from the documentation.
>
>
> 1.If you set a Time To Live to a topic and to a message, which Time To
> Live is prioritized?
>
> 2.Supports Apache Kafka a mechanism to stop the publisher if the
> subscriber is too slow?
>
> 3.Is there a possibility of a script engine? Does Kafka supports the
> implementation and execution of self-made scripts?
>
> 4.Is there a possibility to join topics or partitions?
>
> 5.Support Kafka such a following mechanism: A mechanism to control
> whether each write to the store will also call sync on the file system to
> ensure all data is written to the disk.
>
>
>
> I hope I will get support.
>
>
>
> Thanks in advance.
>
>
> Mit freundlichen Grüßen / Best regards
>
> Karin Herwerth
>
> EAI Development (CI/OSW3)
> Robert Bosch GmbH | Postfach 30 02 20 | 70442 Stuttgart | GERMANY |
> www.bosch.com
>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar
> Denner,
> Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr.
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Dr. Werner Struth, Peter Tyroller
>
>
>


Re: Questions about Apache Kafka

2016-08-24 Thread Ian Wrigley
Actually, for 3 and 4 Kafka Streams is also a very good option — and comes as 
part of the standard Apache Kafka distribution.

Ian.

---
Ian Wrigley
Director, Education Services
Confluent, Inc

> On Aug 24, 2016, at 2:37 PM, David Garcia  wrote:
> 
> Regarding 3 and 4: https://calcite.apache.org/docs/stream.html  (i.e. 
> streaming SQL queries)
> 
> 
> On 8/24/16, 6:29 AM, "Herwerth Karin (CI/OSW3)"  
> wrote:
> 
>Dear Sir or Madam,
> 
>I'm a beginner in Apache Kafka and have questions which I don't get it 
> from the documentation.
> 
> 
>1.If you set a Time To Live to a topic and to a message, which Time To 
> Live is prioritized?
> 
>2.Supports Apache Kafka a mechanism to stop the publisher if the 
> subscriber is too slow?
> 
>3.Is there a possibility of a script engine? Does Kafka supports the 
> implementation and execution of self-made scripts?
> 
>4.Is there a possibility to join topics or partitions?
> 
>5.Support Kafka such a following mechanism: A mechanism to control 
> whether each write to the store will also call sync on the file system to 
> ensure all data is written to the disk.
> 
> 
> 
>I hope I will get support.
> 
> 
> 
>Thanks in advance.
> 
> 
>Mit freundlichen Grüßen / Best regards
> 
>Karin Herwerth
> 
>EAI Development (CI/OSW3)
>Robert Bosch GmbH | Postfach 30 02 20 | 70442 Stuttgart | GERMANY | 
> www.bosch.com
> 
> 
>Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
>Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
>Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
>Christoph Kübel, Uwe Raschke, Dr. Werner Struth, Peter Tyroller
> 
> 
> 
> 
>