Re: Maintaining message ordering using KafkaSpout/Bolt

2016-06-05 Thread Matthias J. Sax
Hi Kanagha, For reading, KafkaSpout's internally used KafkaConsumer ensures that data is received in-order per partition. Because the spout might read multiple partitions, and emit only a single (logical) output stream, within this output stream, data from multiple partitions interleave (the

Re: Maintaining message ordering using KafkaSpout/Bolt

2016-06-05 Thread Matthias J. Sax
just use fieldGrouping on the key after your spout to preserve the order per key. -Matthias On 06/05/2016 11:26 AM, Matthias J. Sax wrote: > Hi Kanagha, > > For reading, KafkaSpout's internally used KafkaConsumer ensures that > data is received in-order per partition. Because the spou

Re: Maintaining message ordering using KafkaSpout/Bolt

2016-06-05 Thread Matthias J. Sax
> Does the parallelism_hint set when a KafkaSpout is added to a topology, > need to match the number of partitions in a topic? No. On 06/05/2016 11:26 AM, Matthias J. Sax wrote: > Hi Kanagha, > > For reading, KafkaSpout's internally used KafkaConsumer ensures that > data is

Re: KafkaStream and Kafka consumer group

2016-06-11 Thread Matthias J. Sax
Do you instantiate KafkaProduer in your user code? Why no use KStream.to("topic-name") ? -Matthias On 06/10/2016 12:28 AM, Saeed Ansari wrote: > Thank you Eno, > Adding more threads extremely increased the throughput of stream. As I said > after processing I send the event to another topic.

Re: Track progress of kafka stream job

2016-06-03 Thread Matthias J. Sax
reaming app we are working on. > We'll most likely stick to DSL for that. > Does the DSL expose any stat or debug info? Or any way to access the > underlying Context? > > Srikanth > > On Thu, Jun 2, 2016 at 9:30 AM, Matthias J. Sax <matth...@confluent.io> > wrote:

Re: KafkaStream and Kafka consumer group

2016-06-13 Thread Matthias J. Sax
nto it. So I use KafkaConsumer instead. >> >> On Sat, Jun 11, 2016 at 7:13 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Do you instantiate KafkaProduer in your user code? >>> >>> Why no use KStream.to("topic-name") ? >>>

Re: Kafka Streams / Processor

2016-05-30 Thread Matthias J. Sax
ards > Toby > > > >> On 27 May 2016, at 1:32 AM, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Hi Toby, >> >> 1) I am not an expert for RocksDB, but I don't see a problem with larger >> objects. >> >> 2) I assume,

Re: Kafka Streams / Processor

2016-05-26 Thread Matthias J. Sax
Hi Toby, 1) I am not an expert for RocksDB, but I don't see a problem with larger objects. 2) I assume, by "guaranteed" you mean that the commit is performed when the call return. In this case, no. It only sets a flag to commit at the next earliest point in time possible. Ie, you can trigger

Re: Create KTable from two topics

2016-06-02 Thread Matthias J. Sax
Hi Srikanth, your third approach seems to be the best fit. It uses only one shuffle of the data (which you cannot prevent in any case). If you want to put everything into a single application, you could use a "dummy" custom aggregation to convert the KStream into a KTable instead of writing into

Re: Track progress of kafka stream job

2016-06-02 Thread Matthias J. Sax
Hi Srikanth, I am not exactly sure if I understand your question correctly. One way to track the progress is to get the current record offset (you can obtain it in the low lever Processor API via the provided Context object). Otherwise, on commit, all writes to intermediate topics are flushed

Re: Create KTable from two topics

2016-06-02 Thread Matthias J. Sax
I would not expect a performance difference. -Matthias On 06/02/2016 06:15 PM, Srikanth wrote: > In terms of performance there is not going to be much difference to+table > vs through+aggregateByKey rt? > > Srikanth > > > On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax &

Re: About null keys

2016-06-16 Thread Matthias J. Sax
On small addition: If your input topic does not have any key specified, you can still extract a key from the value KTable source = builder.stream(...).selectKey(...).toTable(...) -Matthias On 06/15/2016 08:30 PM, Eno Thereska wrote: > Hi Adrienne, > > How do you enter the input on t1 topic?

Re: Wordcount with reduce

2016-06-19 Thread Matthias J. Sax
Can you show the full stack trace? How do you ingest the date into the topic? I also think, you should read the topic as KStream (instead of KTable). What de-/serializer do you specify in props. (see http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes)

Re: Kafka streams for out of order density aggregation

2016-06-24 Thread Matthias J. Sax
I just want to add something: If I understand the question correctly, you are asking for a strong ordering guarantee. I personally doubt that out-of-order on count-based windows can be supported with strong consistency guarantee in an efficient manner. If a late record arrives for window X, the

Re: Question on Java client consumer.seek()

2016-04-06 Thread Matthias J. Sax
Sounds reasonable to me. On 04/05/2016 05:56 PM, Mario Ricci wrote: > I found this > thread > and see that poll() must be called before seek(). > > This

Re: kafka streams internal topic

2016-05-20 Thread Matthias J. Sax
Hi Srikanth, I basically agree on (1). We are still working on configuration options for Kafka Streams. For (2), you would get an error. If the number of partitions is not the same, the join cannot be computed. There is already a ticket to insert a re-partitioning step automatically, in case

Re: Kafka stream join scenarios

2016-05-21 Thread Matthias J. Sax
Hi Srikanth, 1) there is no support on DSL level, but if you use Processor API you can do "anything" you like. So yes, a map-like transform() that gets initialized with the "broadcast-side" of the join should work. 2) Right now, there is no way to stall a stream -- a custom TimestampExtractor

Re: KafkaStreams / Processor API - How to retrieve offsets

2016-05-23 Thread Matthias J. Sax
Hi, you can use "ProcessorContext" that is provided via init(). The context object is dynamically adjusted to the currently processed record To get the offset, use "context.offset()". -Matthias On 05/22/2016 10:38 PM, Florian Hussonnois wrote: > Hi everyone, > > Is there any way to retrieve

Re: Kafka Streams change log behavior

2016-05-23 Thread Matthias J. Sax
Hi Manu, Yes. If a StreamTask recovers, it will write to the same change log's topic partition. Log compaction is enable per default for those topics. You still might see some duplicates in your output. Currently, Kafka Streams guarantees at-least-once processing (exactly-once processing is on

Re: Kafka Streams change log behavior

2016-05-23 Thread Matthias J. Sax
/23/2016 10:48 AM, Manu Zhang wrote: > Thanks Matthias. Is there a way to allow users to read change logs from a > previous application ? > > On Mon, May 23, 2016 at 3:57 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi Manu, >> >> Yes. If a Stre

Re: Kafka Streams: KStream - KTable Left Join

2016-05-04 Thread Matthias J. Sax
Hi, I am still new to Kafka Streams by myself, but from my understanding if you change the key, your partitioning changes, ie, is not valid anymore. Thus, the joins (which assumes co-located data) cannot be performed (this is the reason why sources get set to null). You can write to an

Re: Kafka Streams: KStream - KTable Left Join

2016-05-04 Thread Matthias J. Sax
+1 I had the same thought and put it on my personal agenda already. -Matthias On 05/04/2016 06:37 PM, Jay Kreps wrote: > Is it possible to make the error message give more an explanation? > > -Jay > > On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io&

Re: Kafka Streams: finding a solution to a particular use case

2016-04-20 Thread Matthias J. Sax
Log compaction can also delete keys if the payload for a key is null: "Compaction also allows from deletes. A message with a key and a null payload will be treated as a delete from the log. This delete marker will cause any prior message with that key to be removed (as would any new message with

Re: Kafka Streams multi-node

2016-07-26 Thread Matthias J. Sax
David's answer is correct. Just start the same application multiple times on different nodes and the library does the rest for you. Just one addition: as Kafka Streams is for standard application development, there is no need to run the application on the same nodes as your brokers are running

Re: Kafka Streams multi-node

2016-07-26 Thread Matthias J. Sax
t me if I am wrong. > > Thanks > Davood > > On Tue, Jul 26, 2016 at 9:44 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> David's answer is correct. Just start the same application multiple >> times on different nodes and the library does the

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
ave in my first email. > > Table1 > 111 -> aaa > 222 -> bbb > 333 -> aaa > > Here value is not unique(aaa). So, I can't just make it a key. 333 will > then override 111. > > Srikanth > > On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth..

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
e perform join after re-partition. Although we > re-partitioned with value, the key doesn't change. > KTable joins always use keys and ValueJoiner get values from both table > when keys match. > > Having data co-located will not be sufficient rt?? > > Srikanth > > On Thu,

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
le1.toStream().through(ValueBasedPartitioner).process(customProcessor) > > I'm not sure if creating a KTable inside a Processor like this is > semantically correct. > But at least both tables are co-partitioned and we don't have to replicate. > > Srikanth > > On Thu, J

Re: Kafka Streams: Merging of partial results

2016-07-21 Thread Matthias J. Sax
hat documentation/blog > post still coming? Is there anything I can read now about how at least once > delivery is guaranteed by Kafka Streams? > > Cheers, > Michael-Keith > > > From: Matthias J. Sax <matth...@confluent.io> > Sent: Thursday, July 21, 2016 7:31 AM >

Re: KTable and Rebalance Operations

2016-08-02 Thread Matthias J. Sax
Hi David, on startup of the second application instance, the KTable is effectively partitioned into two distinct partial KTables, each holding the key-valus pairs for their corresponding assigned partitions. Thus, your "lookups" on each instance, can only access the key-value pairs for the set

Re: Kafka consumer

2016-08-11 Thread Matthias J. Sax
You need to commit messages either manually or enable auto commit via "auto.commit.enable = true". -Matthias On 08/10/2016 07:34 PM, Auduru Suresh wrote: > Hi , > > We've wrote kafka consumer and for each restart it is reading messages from > starting of partition . Please help me how we can

Re: KStream-to-KStream Join Example

2016-07-14 Thread Matthias J. Sax
Hi, Both streams need to be co-partitioned, ie, if you change the key of one join input, you need to re-partitioned this stream on the new key via .through(). You should create the topic you use in through() manually, before you start your Kafka Streams application. (For future release, this

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
I would recommend re-partitioning as described in Option 2. -Matthias On 07/13/2016 10:53 PM, Srikanth wrote: > Hello, > > I'm trying the following join using KTable. There are two change log tables. > Table1 > 111 -> aaa > 222 -> bbb > 333 -> aaa > > Table2 > aaa -> 999 > bbb -> 888

Re: Kafka Streams: Merging of partial results

2016-07-21 Thread Matthias J. Sax
Hi, you answered your question absolutely correctly by yourself (ie, internal topic creating in groupBy() to repartition the data on words). I cannot add more to explain how it works. You might want to have a look here for more details about Kafka Streams in general:

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Matthias J. Sax
t;> mathieu.fenn...@replicon.com> wrote: >>> >>>> Hm... OK, I think that makes sense. >>>> >>>> It seems like I can't filter out those tombstone records; is that >>> expected >>>> as well? If I throw in a .fil

Re: Chaining custom processors with DSL

2016-07-18 Thread Matthias J. Sax
Sure. You can use process(), transform(), or transformValues() on a KStream for general UDFs. See http://docs.confluent.io/3.0.0/streams/developer-guide.html#stateful-transformations -Matthias On 07/17/2016 11:56 PM, Srikanth wrote: > Hello, > > Using the low level API its possible to chain

Re: Null Output topic for Kafka Streams

2016-07-18 Thread Matthias J. Sax
You can use KStream#foreach() as last operator. -Matthias On 07/18/2016 06:50 PM, David Garcia wrote: > I would like to process messages from an input topic, but I don’t want to > send messages downstream…with KStreams. (i.e. I would like to read from a > topic, do some processing including

Re: Questions relating KStream-KTable join with Kafka-Streams

2016-07-19 Thread Matthias J. Sax
Hi Nicolas, your are right, it is currently not possible to get a result from a KTable update (and this is by design). The idea is, that the KStream is enriched with the *current state* of KTable -- thus, for each KStream record a look-up in KTable is done. (In this sense, a KStream-KTable join

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Matthias J. Sax
Hi Mathieu, join semantics are tricky. We are still working on a better documentation for it... For the current state and your question: Each time a record is processed, it looks up the other KTable to see if there is a matching record. If non is found, the join result is empty and a tombstone

Re: Kafka Streaming Question : reset offset

2016-07-20 Thread Matthias J. Sax
Hi Pari, currently, changing the application ID is the best way to follow. Cleaning up the application state correctly, is a little bit tricky. We are currently working on an improvement for this -- should be available soon. See https://issues.apache.org/jira/browse/KAFKA-3185 -Matthias On

Re: Subscribe to kafka user email group.

2016-07-06 Thread Matthias J. Sax
You need to send an email to users-subscr...@kafka.apache.org See https://kafka.apache.org/contact.html -Matthias On 07/06/2016 10:26 PM, Manoj Chaudhary wrote: > Thanks > Manoj > signature.asc Description: OpenPGP digital signature

Re: Kafka Streams : Old Producer

2016-07-07 Thread Matthias J. Sax
Hi Vivek, Kafka Streams works only with Kafka 0.10 (but not with 0.9). I am not aware of any work around to allow for 0.9 usage. -Matthias On 07/07/2016 05:37 AM, vivek thakre wrote: > Can kafka streams library work with the messages produced by 0.9.0.1 > producer? > I guess not since the old

Re: Kafka Streams : Old Producer

2016-07-08 Thread Matthias J. Sax
>> with a Kafka 0.9 broker (which it cannot). The question is whether Kafka >>> Streams can process messages produced with a 0.9.0.1 producer into a >>> 0.10.0.0 broker. Is that right? If so, would a custom TimestampExtractor >>> work? >>> >>> Ismael >&

Re: streaming-enabled SQL in Kafka Streams?

2016-06-30 Thread Matthias J. Sax
Hi Alex, we do have SQL layer on the long term roadmap (also considering Calcite). Thanks! -Matthias On 06/30/2016 09:41 AM, Alex Glikson wrote: > Did folks consider adding support in Kafka Streams for Apache Calcite [1], > for streaming-enabled SQL (potentially on top of existing DSL)?

Re: Question about bootstrap processing in KafkaStreams.

2016-06-29 Thread Matthias J. Sax
Hi, there was a similar discussion on the list already "Kafka stream join scenario": http://search-hadoop.com/m/uyzND1WsAGW1vB5O91=Kafka+stream+join+scenarios Long story short: there is no explicit support or guarantee. As Jay mentioned, some alignment is best effort. However, the main issues

Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Matthias J. Sax
One thing I want to add: If you use window-operations, windows are kept until there retention time expires. Thus, reducing the retention time, should decrease the memory RocksDB needs to preserve windows. See

Re: Handling of uncommitted messages

2016-07-01 Thread Matthias J. Sax
http://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/ From my understanding, the watermark will not advance until both failed follow-broker recovered and caught up. -Matthias On 07/01/2016 12:17 PM, Vikas Kumar wrote: > Hi, > > I have a question about

Re: kafka-streams ktable recovery after rebalance crash

2017-02-01 Thread Matthias J. Sax
Not sure why the locks on the state directory got not release (maybe because of the crash) -- what version do you use? We fixed a couple of bug with this regard lately -- maybe it's fixed in upcoming 0.10.2 For now, you might want to delete the whole state directory (either manually or by calling

Re: "End of Batch" event

2017-02-01 Thread Matthias J. Sax
SourceTask. > > Currently, I'm representing all CSVs records in one KStream (adding source > to each record). But I can represent them as separate KStreams if needed. > Are you suggesting windowing these KStreams with 24 hours window and then > merging them? > > > > >

Re: "End of Batch" event

2017-02-01 Thread Matthias J. Sax
has to be so complex... Kafka can be configured > to delete items older than 24h in a topic. So if you want to get rid > of records that did not arrive in the last 24h, just configure the > topic accordingly? > > On Wed, Feb 1, 2017 at 2:37 PM, Matthias J. Sax <matth...@confluent.io&

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Matthias J. Sax
sumably my first pass processor can still output new > dimension entries to the topic that the table is backed by? Again for > "find or create". > > On 1 February 2017 at 19:21, Matthias J. Sax <matth...@confluent.io> wrote: > >> Thanks! >> >> Abou

Re: Kafka Streams delivery semantics and state store

2017-02-02 Thread Matthias J. Sax
efinitely. An easy solution to >> removing old records would be to access committed offset and delete all >> entries before it, but I did not find an easy way to access the committed >> offset. >> >> Is my thinking correct here? How could I maintain such state store

Re: Kafka Streams delivery semantics and state store

2017-02-03 Thread Matthias J. Sax
f you could verify > whether what I wrote above is correct. > > Kind Regards > Krzysztof Lesniewski > > On 03.02.2017 01:06, Matthias J. Sax wrote: >> You assumptions is not completely correct. >> >> After a crash and State Store restore, the store will contain ex

[DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-03 Thread Matthias J. Sax
Hi All, I did prepare a KIP to do some cleanup some of Kafka's Streaming API. Please have a look here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API Looking forward to your feedback! -Matthias signature.asc Description: OpenPGP digital

Kafka docs for current trunk

2017-01-31 Thread Matthias J. Sax
Hi, I want to collect feedback about the idea to publish docs for current trunk version of Apache Kafka. Currently, docs are only published for official release. Other projects also have docs for current SNAPSHOT version. So the question rises, if this would be helpful for Kafka community, too.

Re: "End of Batch" event

2017-01-31 Thread Matthias J. Sax
de the SourceTask to get a > snapshot of what currently in K for a specific source S, then I can send > delete message for the missing items by subtracting latest items in the CSV > from the items of that source in K. > > Thanks, > > On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Matthias J. Sax
Yes and no. It does not depend on the number of tuples but on the timestamps of the tuples. I would assume, that records in the high volume stream have timestamps that are only a few milliseconds from each other, while for the low volume KTable, record have timestamp differences that are much

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Matthias J. Sax
One thing to add: There are plans/ideas to change punctuate() semantics to "system time" instead of "stream time". Would this be helpful for your use case? -Matthias On 2/1/17 9:41 AM, Matthias J. Sax wrote: > Yes and no. > > It does not depend on the number of

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-06 Thread Matthias J. Sax
quot; belong in two different levels of the hierarchy. They both > transform two (or more) streams into one. > > Gwen > > On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax <matth...@confluent.io> wrote: >> Hi All, >> >> I did prepare a KIP to do some cleanup som

Re: KIP-121 [Discuss]: Add KStream peek method

2017-02-06 Thread Matthias J. Sax
Steven, Thanks for your KIP. I move this discussion to dev mailing list -- KIPs need to be discussed there (and can be cc'ed to user list). Can you also add the KIP to the table "KIPs under discussion":

Re: KIP-121 [Discuss]: Add KStream peek method

2017-02-08 Thread Matthias J. Sax
, >>>>>> >>>>>> I like the proposal, thank you. I have found it frustrating myself >> not to >>>>>> be able to understand simple things, like how many records have been >>>>>> currently processed. The peek method would allow t

Re: KTable and cleanup.policy=compact

2017-02-08 Thread Matthias J. Sax
, however your topics will grow larger than >> necessary. >> >> Eno >> >>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> wrote: >>> >>> What are the ramifications of failing to do this? >>> >>> On Tue,

Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Matthias J. Sax
It's difficult problem. And before we discuss deeper, a follow up question: if you map from to new_key, is this mapping "unique", or could it be that two different k/v-pairs map to the same new_key? If there are overlaps, you end up with a different problem as if there are no overlaps,

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Matthias J. Sax
I am not sure about --reset-plus and --reset-minus Would this skip n messages forward/backward for each partitions? -Matthias On 2/8/17 2:23 PM, Jorge Esteban Quilcate Otoya wrote: > Great. I think I got the idea. What about this options: > > Scenarios: > > 1. Current status > >

Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-08 Thread Matthias J. Sax
+1 On 2/8/17 4:51 PM, Gwen Shapira wrote: > +1 (binding) > > On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker > wrote: >> Hi everyone, >> >> Thank you for constructive feedback on KIP-121, >> KStream.peek(ForeachAction) ; >> it seems like it is time to call a

Re: Kafka Streams: How to best maintain changelog indices using the DSL?

2017-02-08 Thread Matthias J. Sax
s each index key may > have a list of entities it maps to. For non-unique fields where an index > key may map to thousands of entities, it is not practical maintaining them > in a single aggregation. > > Any further guidance would be greatly appreciated. Thank you! > &g

Re: Kafka Streams delivery semantics and state store

2017-02-06 Thread Matthias J. Sax
would be idempotent and we could still > produce outcome for messages (a) and (b). It adds however extra > complexity - we need to maintain the map over time by deleting entries > older than committed offset. > > What do you think Matthias? > > Kind Regards > Krzysztof

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-06 Thread Matthias J. Sax
native approach with this new API. >> >> KStreamBuilder.newName -- Similar to addInternalTopic, I use this to create >> processor names in reusable components. Lacking this method would be >> fairly easy to work around. >> >> Mathieu >> >> >>

Re: Kafka Streams: Is automatic repartitioning before joins public/stable API?

2017-02-07 Thread Matthias J. Sax
Yes, you can rely on this. The feature was introduced in Kafka 0.10.1 and will stay like this. We already updated the JavaDocs (for upcoming 0.10.2, that is going to be released the next weeks), that explains this, too. See https://issues.apache.org/jira/browse/KAFKA-3561 -Matthias On 2/7/17

Re: KTable and cleanup.policy=compact

2017-02-07 Thread Matthias J. Sax
Yes, that is correct. -Matthias On 2/7/17 6:39 PM, Mathieu Fenniak wrote: > Hey kafka users, > > Is it correct that a Kafka topic that is used for a KTable should be set to > cleanup.policy=compact? > > I've never noticed until today that the KStreamBuilder#table() > documentation says:

Fwd: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-04 Thread Matthias J. Sax
cc'ed from dev Forwarded Message Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API Date: Sat, 4 Feb 2017 11:30:46 -0800 From: Matthias J. Sax <matth...@confluent.io> Organization: Confluent Inc To: d...@kafka.apache.org I think the right pattern

Re: Kafka Streams punctuate with slow-changing KTables

2017-02-01 Thread Matthias J. Sax
il the decision is made regarding the timing would it be best to ignore > `punctuate` entirely and trigger everything message by message via > `process`? > > On 1 February 2017 at 17:43, Matthias J. Sax <matth...@confluent.io> wrote: > >> One thing to add: >> >>

Re: Kafka Streams delivery semantics and state store

2017-02-02 Thread Matthias J. Sax
Hi, About message acks: writes will be acked, however async (not sync as you describe it). Only before an actual commit, KafkaProducer#flush is called and all not-yet received acks are collected (ie, blocking/sync) before the commit is done. About state guarantees: there are none -- state might

Re: "End of Batch" event

2017-01-31 Thread Matthias J. Sax
tems from that source that > doesn't exist in the latest CSV file. > > I was thinking of using "End of Batch" message to initiate that process. I > might need to do the clean-up as part of the Connect code instead, or there > is a better way of doing that? > > Thanks, > Eric

Re: Table a KStream

2017-01-24 Thread Matthias J. Sax
If your data is already partitioned by key, you can save writing to a topic by doing a dummy reduce instead: stream .groupByKey() .reduce(new Reducer() { V apply(V value1, V value2) { return value2; } }, "yourStoreName"); (replace V with your actuall value type) -Matthias

Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-30 Thread Matthias J. Sax
implies materialisation). So In the >>>>>> example of >>>>>> KTable::filter if you call >>>>>> getIQHandle on both tables only the one source that is there would >>>>>> materialize and the QueryHandleabstraction would make s

Re: Understanding output of KTable->KTable join

2017-01-30 Thread Matthias J. Sax
If you join two KTables, one-to-many join is currently not supported (only one-to-one, ie, primary key join). In upcoming 0.10.2 there will be global-KTables that allow something similar to one-to many joins -- however, only for KStream-GlobalKTable joins, so not sure if this can help you. About

Re: "End of Batch" event

2017-01-29 Thread Matthias J. Sax
Hi, currently, a Kafka Streams application is designed to "run forever" and there is no notion of "End of Batch" -- we have plans to add this though... (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams) Thus, right now you need to

Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-30 Thread Matthias J. Sax
@trivago.com> wrote: >>>> >>>> Hi Exactly >>>> >>>> I know it works from the Processor API, but my suggestion would prevent >>>> DSL users dealing with storenames what so ever. >>>> >>>> In general I am pro switching betwee

Re: At Least Once semantics for Kafka Streams

2017-01-30 Thread Matthias J. Sax
Hi, yes, all examples have at-least-once semantics because this is the only "mode" Kafka Streams supports -- you cannot "disable" it. (btw: we are currently working on exactly-once for Streams that you will be able to turn off/on). There is not much documentation about how it work internally,

Re: How to log/analyze the consumer lag in kafka streaming application

2017-01-26 Thread Matthias J. Sax
You should check out Kafka Streams Metrics (for upcoming 0.10.2 they are even more detailed). There is not a lot of documentation for 0.10.0 or 0.10.1, but it work the same way as for consumer/producer metric that are documented. -Matthias On 1/24/17 10:38 PM, Sachin Mittal wrote: > Hi All, >

Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Matthias J. Sax
w method help? >>>> >>>> I cannot see the reason for the additional materialize method being >>>> required! Hence I suggest leave it alone. >>>> regarding removing the others I dont have strong opinions and it >>>> seems to >>&

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-20 Thread Matthias J. Sax
Hi, thanks for updating the KIP. Couple of follow up comments: * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by time" option -- IMHO it belongs to "reset by position"? * Nit: Description of "Reset to Earliest" > using Kafka Consumer's `auto.offset.reset` to `earliest` I

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Matthias J. Sax
There is a voting thread on dev list. Please put your vote there. Thx. -Matthias On 2/23/17 8:15 PM, Mahendra Kariya wrote: > +1 for such a tool. It would be of great help in a lot of use cases. > > On Thu, Feb 23, 2017 at 11:44 PM, Matthias J. Sax <matth...@confluent.io> >

Re: Immutable Record with Kafka Stream

2017-02-24 Thread Matthias J. Sax
First, I want to mention that you do no see "duplicate" -- you see late updates. Kafka Streams embraces "change" and there is no such thing as a final aggregate, but each agg output record is an update/refinement of the result. Strict filtering of "late updates" is hard in Kafka Streams If you

Re: Storm kafka integration

2017-02-19 Thread Matthias J. Sax
You should ask Storm people. Kafka Spout is not provided by Kafka community. Or maybe try out Kafka's Streams API (couldn't resist... ;) ) -Matthias On 2/19/17 11:49 AM, pradeep s wrote: > Hi, > I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to > integrate with Kafka.

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Matthias J. Sax
You can already output any number of record within .transform() using the provided Context object from init()... -Matthias On 2/14/17 9:16 AM, Guozhang Wang wrote: >> and you can't output multiple records or branching logic from a > transform(); > > For output multiple records in transform, we

Re: Kafka streams: Getting a state store associated to a processor

2017-02-13 Thread Matthias J. Sax
Can you try this out with 0.10.2 branch or current trunk? We put some fixed like you suggested already. Would be nice to get feedback if those fixed resolve the issue for you. Some more comments inline. -Matthias On 2/13/17 12:27 PM, Adam Warski wrote: > Following this answer, I checked that

Re: Partitioning behavior of Kafka Streams without explicit StreamPartitioner

2017-02-09 Thread Matthias J. Sax
It's by design. The reason it, that Streams uses a single producer to write to different output topic. As different output topics might have different key and/or value types, the producer is instantiated with byte[] as key and value type, and Streams serialized the data before handing it to the

Re: Table a KStream

2017-02-10 Thread Matthias J. Sax
46, Nick DeCoursin <n.decour...@foodpanda.com> > wrote: > >> Thank you very much, both suggestions are wonderful, and I will try them. >> Have a great day! >> >> Kind regards, >> Nick >> >> On 24 January 2017 at 19:46, Matthias J. Sax <m

Re: [VOTE] 0.10.2.0 RC1

2017-02-10 Thread Matthias J. Sax
Hi Ian, thanks for reporting this. I had a look at the stack trace and code and the whole situation is quite confusing. The exception itself is expected but we have a try-catch-block that should swallow the exception and it should never bubble up: In AbstractTaskCreator.retryWithBackoff a

Re: KTable and cleanup.policy=compact

2017-02-13 Thread Matthias J. Sax
PM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> >>> Yeah makes sense. I was looking at it from the point of view of keeping >>> all data forever. >>> >>> Eno >>> >>>> On 8 Feb 2017, at 20:27, Matthias J. Sax <mat

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-17 Thread Matthias J. Sax
With regard to the JIRA. I guess we do not want to put the end timestamp into the key. For general usage, windows of different type are written into different topics. Thus, Nicolas' use case is quite special and using custom Serde is the better approach to handle it, instead of changing Kafka

Re: kafka streams consumer partition assignment is uneven

2017-01-17 Thread Matthias J. Sax
Sorry for answering late. The mapping from partitions to threads also depend on the structure of your topology. As you mention that you have a quite complex one, I assume that this is the reason for the uneven distribution. I you want to dig deeper, it would be helpful to know the structure of

Re: Wait a few seconds before initializing state stores, so others don't have to wait before joining.

2017-02-28 Thread Matthias J. Sax
Hi Nicolas, an optimization like this would make a lot of sense. We did have some discussions around this already. However, it's more tricky to do than it seems at a first glace. We hope to introduce something like this for the next release. -Matthias On 2/28/17 9:10 AM, Nicolas Fouché

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
If a store is backed by a changelog topic, the changelog topic is responsible to hold the latest state of the store. Thus, the topic must store the latest value per key. For this, we use a compacted topic. If case of restore, the local RocksDB store is cleared so it is empty, and we read the

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
Tainji, Streams provides at-least-once processing guarantees. Thus, all flush/commits must be aligned -- otherwise, this guarantee might break. -Matthias On 2/28/17 6:40 AM, Damian Guy wrote: > Hi Tainji, > > The changelogs are flushed on the commit interval. It isn't currently > possible to

Re: Kafka streams questions

2017-02-28 Thread Matthias J. Sax
Adding partitions: You should not add partitions at runtime -- it might break the semantics of your application because is might "mess up" you hash partitioning. Cf. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowtoscaleaStreamsapp,i.e.,increasenumberofinputpartitions? If you are

Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Matthias J. Sax
I > misunderstanding? > >> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote: >> >> If a store is backed by a changelog topic, the changelog topic is >> responsible to hold the latest state of the store. Thus, the topic must >> sto

Re: when will the messsages be sent to broker

2017-03-01 Thread Matthias J. Sax
> Yuanjia Li > > From: Matthias J. Sax > Date: 2017-03-02 01:42 > To: users > Subject: Re: when will the messsages be sent to broker > There is also linger.ms parameter that is an upper bound how long a (not > yet filled) buffer is hold before sending it even if it's not

  1   2   3   4   5   6   7   8   9   10   >