Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-19 Thread Michael Noll
For what it's worth, here is an example sketch that I came up with. Point
is to show an alternative direction for the KStreams logo.

https://ibb.co/bmZxDCg

Thinking process:

   - It shows much more clearly (I hope) that KStreams is an official part
   of Kafka.
   - The Kafka logo is still front and center, and KStreams orbits around
   it like electrons around the Kafka core/nucleus. That’s important because
   we want users to adopt all of Kafka, not just bits and pieces.
   - It uses and builds upon the same ‘simple is beautiful’ style of the
   original Kafka logo. That also has the nice side-effect that it alludes to
   Kafka’s and KStreams’ architectural simplicity.
   - It picks up the good idea in the original logo candidates to convey
   the movement and flow of stream processing.
   - Execution-wise, and like the main Kafka logo, this logo candidate
   works well in smaller size, too, because of its simple and clear lines.
   (Logo types like the otter ones tend to become undecipherable at smaller
   sizes.)
   - It uses the same color scheme of the revamped AK website for brand
   consistency.

I am sure we can come up with even better logo candidates.  But the
suggestion above is, in my book, certainly a better option than the otters.

-Michael



On Wed, Aug 19, 2020 at 11:09 PM Boyang Chen 
wrote:

> Hey Ben,
>
> that otter was supposed to be a river-otter to connect to "streams". And of
> course, it's cute :)
>
> On Wed, Aug 19, 2020 at 12:41 PM Philip Schmitt <
> philip.schm...@outlook.com>
> wrote:
>
> > Hi,
> >
> > I’m with Robin and Michael here.
> >
> > What this decision needs is a good design brief.
> > This article seems decent:
> >
> https://yourcreativejunkie.com/logo-design-brief-the-ultimate-guide-for-designers/
> >
> > Robin is right about the usage requirements.
> > It goes a bit beyond resolution. How does the logo work when it’s on a
> > sticker on someone’s laptop? Might there be some cases, where you want to
> > print it in black and white?
> > And how would it look if you put the Kafka, ksqlDB, and Streams stickers
> > on a laptop?
> >
> > Of the two, I prefer the first option.
> > The brown on black is a bit subdued – it might not work well on a t-shirt
> > or a laptop sticker. Maybe that could be improved by using a bolder
> color,
> > but once it gets smaller or lower-resolution, it may not work any longer.
> >
> >
> > Regards,
> > Philip
> >
> >
> > P.S.:
> > Another article about what makes a good logo:
> > https://vanschneider.com/what-makes-a-good-logo
> >
> > P.P.S.:
> >
> > If I were to pick a logo for Streams, I’d choose something that fits well
> > with Kafka and ksqlDB.
> >
> > ksqlDB has the rocket.
> > I can’t remember (or find) the reasoning behind the Kafka logo (aside
> from
> > representing a K). Was there something about planets orbiting the sun? Or
> > was it the atom?
> >
> > So I might stick with a space/sience metaphor.
> > Could Streams be a comet? UFO? Star? Eclipse? ...
> > Maybe a satellite logo for Connect.
> >
> > Space inspiration: https://thenounproject.com/term/space/
> >
> >
> >
> >
> > 
> > From: Robin Moffatt 
> > Sent: Wednesday, August 19, 2020 6:24 PM
> > To: users@kafka.apache.org 
> > Cc: d...@kafka.apache.org 
> > Subject: Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo
> >
> > I echo what Michael says here.
> >
> > Another consideration is that logos are often shrunk (when used on
> slides)
> > and need to work at lower resolution (think: printing swag, stitching
> > socks, etc) and so whatever logo we come up with needs to not be too
> fiddly
> > in the level of detail - something that I think both the current proposed
> > options will fall foul of IMHO.
> >
> >
> > On Wed, 19 Aug 2020 at 15:33, Michael Noll  wrote:
> >
> > > Hi all!
> > >
> > > Great to see we are in the process of creating a cool logo for Kafka
> > > Streams.  First, I apologize for sharing feedback so late -- I just
> > learned
> > > about it today. :-)
> > >
> > > Here's my *personal, subjective* opinion on the currently two logo
> > > candidates for Kafka Streams.
> > >
> > > TL;DR: Sorry, but I really don't like either of the proposed "otter"
> > logos.
> > > Let me try to explain why.
> > >
> > >- The choice to use an animal, regardless of which specific animal,
> > >seems random and doesn

Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-19 Thread Michael Noll
Hi all!

Great to see we are in the process of creating a cool logo for Kafka
Streams.  First, I apologize for sharing feedback so late -- I just learned
about it today. :-)

Here's my *personal, subjective* opinion on the currently two logo
candidates for Kafka Streams.

TL;DR: Sorry, but I really don't like either of the proposed "otter" logos.
Let me try to explain why.

   - The choice to use an animal, regardless of which specific animal,
   seems random and doesn't fit Kafka. (What's the purpose? To show that
   KStreams is 'cute'?) In comparison, the O’Reilly books always have an
   animal cover, that’s their style, and it is very recognizable.  Kafka
   however has its own, different style.  The Kafka logo has clear, simple
   lines to achieve an abstract and ‘techy’ look, which also alludes nicely to
   its architectural simplicity. Its logo is also a smart play on the
   Kafka-identifying letter “K” and alluding to it being a distributed system
   (the circles and links that make the K).
   - The proposed logos, however, make it appear as if KStreams is a
   third-party technology that was bolted onto Kafka. They certainly, for me,
   do not convey the message "Kafka Streams is an official part of Apache
   Kafka".
   - I, too, don't like the way the main Kafka logo is obscured (a concern
   already voiced in this thread). Also, the Kafka 'logo' embedded in the
   proposed KStreams logos is not the original one.
   - None of the proposed KStreams logos visually match the Kafka logo.
   They have a totally different style, font, line art, and color scheme.
   - Execution-wise, the main Kafka logo looks great at all sizes.  The
   style of the otter logos, in comparison, becomes undecipherable at smaller
   sizes.

What I would suggest is to first agree on what the KStreams logo is
supposed to convey to the reader.  Here's my personal take:

Objective 1: First and foremost, the KStreams logo should make it clear and
obvious that KStreams is an official and integral part of Apache Kafka.
This applies to both what is depicted and how it is depicted (like font,
line art, colors).
Objective 2: The logo should allude to the role of KStreams in the Kafka
project, which is the processing part.  That is, "doing something useful to
the data in Kafka".

The "circling arrow" aspect of the current otter logos does allude to
"continuous processing", which is going in the direction of (2), but the
logos do not meet (1) in my opinion.

-Michael




On Tue, Aug 18, 2020 at 10:34 PM Matthias J. Sax  wrote:

> Adding the user mailing list -- I think we should accepts votes on both
> lists for this special case, as it's not a technical decision.
>
> @Boyang: as mentioned by Bruno, can we maybe add black/white options for
> both proposals, too?
>
> I also agree that Design B is not ideal with regard to the Kafka logo.
> Would it be possible to change Design B accordingly?
>
> I am not a font expert, but the fonts in both design are different and I
> am wondering if there is an official Apache Kafka font that we should
> reuse to make sure that the logos align -- I would expect that both
> logos (including "Apache Kafka" and "Kafka Streams" names) will be used
> next to each other and it would look awkward if the font differs.
>
>
> -Matthias
>
> On 8/18/20 11:28 AM, Navinder Brar wrote:
> > Hi,
> > Thanks for the KIP, really like the idea. I am +1(non-binding) on A
> mainly because I felt like you have to tilt your head to realize the
> otter's head in B.
> > Regards,Navinder
> >
> > On Tuesday, 18 August, 2020, 11:44:20 pm IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
> >
> >  I'm leaning towards design B primarily because it reminds me of the
> Firefox
> > logo which I like a lot. But I also share Adam's concern that it should
> > better not obscure the Kafka logo --- so if we can tweak a bit to fix it
> my
> > vote goes to B, otherwise A :)
> >
> >
> > Guozhang
> >
> > On Tue, Aug 18, 2020 at 9:48 AM Bruno Cadonna 
> wrote:
> >
> >> Thanks for the KIP!
> >>
> >> I am +1 (non-binding) for A.
> >>
> >> I would also like to hear opinions whether the logo should be colorized
> >> or just black and white.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On 15.08.20 16:05, Adam Bellemare wrote:
> >>> I prefer Design B, but given that I missed the discussion thread, I
> think
> >>> it would be better without the Otter obscuring any part of the Kafka
> >> logo.
> >>>
> >>> On Thu, Aug 13, 2020 at 6:31 PM Boyang Chen <
> reluctanthero...@gmail.com>
> >>> wrote:
> >>>
>  Hello everyone,
> 
>  I would like to start a vote thread for KIP-657:
> 
> 
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-657%3A+Add+Customized+Kafka+Streams+Logo
> 
>  This KIP is aiming to add a new logo for the Kafka Streams library.
> And
> >> we
>  prepared two candidates with a cute otter. You could look up the KIP
> to
>  find those logos.
> 
> 
>  Please post your vote against these two

Re: Kafka-streams: mix Processor API with windowed grouping

2018-04-10 Thread Michael Noll
Also, if you want (or can tolerate) probabilistic counting, with the option
to also do TopN in that manner, we also have an example that uses Count Min
Sketch:
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala

The link/branch above is the code variant for Kafka 1.0.

The example implements a custom (fault-tolerant) state store backed by CMS,
which is then used in a Transformer.  The Transformer is then plugged into
the DSL for a mix-and-match setup of DSL and Processor API.


On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov 
wrote:

> Thanks again,
>
> yeah we saw that example for sure :)
>
> Ok, gonna try low-level Transformer and see how it goes.
>
>
> On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax 
> wrote:
>
> > For (1), no.
> >
> > If you want to do manual put/get you should use a Transformer and
> > implement a custom operator.
> >
> > Btw: here is an example of TopN:
> > https://github.com/confluentinc/kafka-streams-
> > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/
> > TopArticlesExampleDriver.java
> >
> >
> >
> > -Matthias
> >
> > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> > > Hi Matthias, thanks
> > >
> > > clarifications below:
> > >
> > > 1. .aggregate( () -> .. ,
> > >(k, v, agg) -> {
> > >//Can i access KV store here for manual
> > put/get?
> > >   });
> > >
> > > 2. TopN is not hard, we using pretty much same approach you describe,
> > just
> > > with bounded priority queue.  The problematic part with 'remaining
> > count' -
> > > everything else not in topN records. It appeared to be quite complex in
> > > streaming world (or we missed something). I'll try to illustrate,
> > assuming
> > > simplified event flow:
> > >
> > >  - acme.com: 100 hits  -> too small, not in TopN, we adding it to
> > remaining
> > > count
> > >  - ... some time later
> > >  - acme.com: 150 hits -> still too small, adding to remaining count
> > >
> > > Problem: we added 250 hits to remaining, but actually we had to add
> only
> > > 150 hits. We have to subtract previous count and it means we need to
> keep
> > > them all somewhere. That's where we hope access to KV store can help.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> >  ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>
> > >> Not sure what you exactly mean by this. An aggregations always uses a
> > >> store; it's a stateful operation and cannot be computed without a
> store.
> > >>
> > >> For TopN, if you get the hit-count as input, you can use a
> > >> `.aggregate()` operator that uses an array or list out output -- this
> > >> list contains the topN and each time, aggregate() is called, you check
> > >> if the new count replaces and existing count in the array/list.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> > >>> Thanks guys,
> > >>>
> > >>> ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>>
> > >>> Here are some details on counting, we basically looking for TopN +
> > >>> Remaining calculation.
> > >>>
> > >>> Example:
> > >>>
> > >>> - incoming data: api url -> hit count
> > >>>
> > >>> - we want output: Top 20 urls per each domain per hour + remaining
> > count
> > >>> per domain (e.g. sum of all other urls hits that do not belong to top
> > 10
> > >>> per each domain per hour).
> > >>>
> > >>> With some grouping variations.
> > >>>
> > >>> Make some sense? Always open for better ideas :)
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang 
> > >> wrote:
> > >>>
> >  Hello Dmitriy,
> > 
> >  You can "simulate" an lower-level processor API by 1) adding the
> > stores
> > >> you
> >  need via the builder#addStore(); 2) do a manual "through" call after
> >  "selectKey" (the selected key will be the same as your original
> > groupBy
> >  call), and then from the repartitioned stream add the `transform()`
> >  operator to do manual windowed counting.
> > 
> >  But before you really go into this route, I'd first like to validate
> > if
> > >> the
> >  provided `Aggregate`, `Initialize` functions really cannot meet your
> >  "overcomplicated
> >  version of record counting", could you elaborate a bit more on this
> > >> logic
> >  so maybe we can still around it around with the pure high-level DSL?
> > 
> > 
> >  Guozhang
> > 
> > 
> >  On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> >  dvsekhval...@gmail.com>
> >  wrote:
> > 
> > > Hey, good day everyone,
> > >
> > > another kafka-streams friday question.
> > >
> > > We hit the wal

Re: Kafka 11 | Stream Application crashed the brokers

2017-12-01 Thread Michael Noll
Thanks for reporting back, Sameer!


On Fri, Dec 1, 2017 at 2:46 AM, Guozhang Wang  wrote:

> Thanks for confirming Sameer.
>
>
> Guozhang
>
> On Thu, Nov 30, 2017 at 3:52 AM, Sameer Kumar 
> wrote:
>
> > Just wanted to let everyone know that this issue got fixed in Kafka
> 1.0.0.
> > I recently migrated to it and didnt find the issue any longer.
> >
> > -Sameer.
> >
> > On Thu, Sep 14, 2017 at 5:50 PM, Sameer Kumar 
> > wrote:
> >
> > > ;Ok. I will inspect this further and keep everyone posted on this.
> > >
> > > -Sameer.
> > >
> > > On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang 
> > wrote:
> > >
> > >> When exactly_once is turned on the transactional id would be set
> > >> automatically by the Streams client.
> > >>
> > >> What I'd inspect is the healthiness of the brokers since the "
> > >> *TimeoutException*", if you have metrics on the broker servers
> regarding
> > >> request handler thread idleness / request queue length / request rate
> > etc,
> > >> you can monitor that and see what could be the possible causes of the
> > >> broker unavailability.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar  >
> > >> wrote:
> > >>
> > >> > Adding more info:-
> > >> >
> > >> > Hi Guozhang,
> > >> >
> > >> > I was using exactly_once processing here, I can see this in the
> client
> > >> > logs, however I am not setting transaction id though.
> > >> >
> > >> > application.id = c-7-e6
> > >> > application.server =
> > >> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> > >> > 172.29.65.193:9092]
> > >> > buffered.records.per.partition = 1
> > >> > cache.max.bytes.buffering = 2097152000
> > >> > client.id =
> > >> > commit.interval.ms = 5000
> > >> > connections.max.idle.ms = 54
> > >> > default.key.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > default.timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > >> > default.value.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > key.serde = class org.apache.kafka.common.serial
> > >> ization.Serdes$StringSerde
> > >> > metadata.max.age.ms = 6
> > >> > metric.reporters = []
> > >> > metrics.num.samples = 2
> > >> > metrics.recording.level = INFO
> > >> > metrics.sample.window.ms = 3
> > >> > num.standby.replicas = 0
> > >> > num.stream.threads = 15
> > >> > partition.grouper = class
> > >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > >> > poll.ms = 100
> > >> > processing.guarantee = exactly_once
> > >> > receive.buffer.bytes = 32768
> > >> > reconnect.backoff.max.ms = 1000
> > >> > reconnect.backoff.ms = 50
> > >> > replication.factor = 1
> > >> > request.timeout.ms = 4
> > >> > retry.backoff.ms = 100
> > >> > rocksdb.config.setter = null
> > >> > security.protocol = PLAINTEXT
> > >> > send.buffer.bytes = 131072
> > >> > state.cleanup.delay.ms = 4611686018427386903
> > >> > state.dir = /data/streampoc/
> > >> > timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
> > >> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> > >> > StringSerde
> > >> > windowstore.changelog.additional.retention.ms = 8640
> > >> > zookeeper.connect =
> > >> >
> > >> >
> > >> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <
> > sam.kum.w...@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hi Guozhang,
> > >> > >
> > >> > > The producer sending data to this topic is not running
> concurrently
> > >> with
> > >> > > the stream processing. I had first ingested the data from another
> > >> cluster
> > >> > > and then have the stream processing ran on it. The producer code
> is
> > >> > written
> > >> > > by me and it doesnt have transactions on by default.
> > >> > >
> > >> > > I will double check if someone else has transaction turned on, but
> > >> this
> > >> > is
> > >> > > quite unlikely. Is there someway to verify it through logs.
> > >> > >
> > >> > > All of this behavior works fine when brokers are run on Kafka 10,
> > this
> > >> > > might be because transactions are only available on Kafka11. I am
> > >> > > suspecting would there be a case that too much processing is
> causing
> > >> one
> > >> > of
> > >> > > the brokers to crash. The timeouts are indicating that it is
> taking
> > >> time
> > >> > to
> > >> > > send data
> > >> > >
> > >> > > I have tried this behavior also on a another cluster which I
> > >> exclusively
> > >> > > use it for myself and found the same behavior there as well.
> > >> > >
> > >> > > What do you think should be our next step so that we can get to
> the
> > >> root
> > >> > > of the issue.
> > >> > >
> > >> > > -Sameer.
> > >> > >
> > >> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <
> wangg...@gmail.com>
> > >> > wrote:
> > >> > >
> > >> > >> Hi Sameer,
> > >> > >>
> > >> > >> If no clients has transactions turned on the
> `__transaction_state`
>

Re: left join between PageViews(KStream) & UserProfile (KTable)

2017-10-23 Thread Michael Noll
> *What key should the join on ? *

The message key, on both cases, should contain the user ID in String format.

> *There seems to be no common key (eg. user) between the 2 classes - PageView
and UserProfile*

The user ID is the common key, but the user ID is stored in the respective
message *keys*, whereas PageView and UserProfile values are stored in the
message *values* in Kafka.

Btw, you might also want to take a look at
https://github.com/confluentinc/kafka-streams-examples, which has many more
examples, including a PageView demo called `PageViewRegionLambdaExample`
that is similar to the one you shared above.  `PageViewRegionLambdaExample`
ships with a data generator for the example, and instructions for how to
run it.

It would be great if we had the same setup for the Apache Kafka page view
example, of course.  Pull requests are very welcome. :-)

Hope this helps,
Michael






On Sun, Oct 22, 2017 at 9:30 PM, karan alang  wrote:

> Hello all -
> I'm trying to run the sample PageView Kafka streams example,
> (ref -
> https://github.com/apache/kafka/tree/0.11.0/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/pageview
> )
> and have a question ..
>
> There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> shown below... The join should give - PageViewByRegions
>
> *What key should the join on ? *
>
> *There seems to be no common key (eg. user) between the 2 classes -
> PageView and UserProfile*
>
> *Also, what should a sample input data in the PageView & UserProfile
> specific topics be ?*
> Do we need to add surrogate (id) key to the input for these, to enable the
> left join ?
>
> Code :
>
> *- static Classes  *
>
> >
> > static public class PageView {
> > public String user;
> > public String page;
> > public Long timestamp;
> > }
> >
>
>
> > static public class UserProfile {
> > public String region;
> > public Long timestamp;
> > }
>
>
>
> *--Join between the views( i.e. KStream - PageView) & users (KTable
> - UserProfile) *
>
>
> KStream kstream1 =
> > *views.leftJoin(users, *
> > //PageView - first value type
> > //UserProfile - 2nd value type
> > //PageViewByRegion - Joined Value
> > new ValueJoiner() {
> > @Override
> > public PageViewByRegion apply(PageView view, UserProfile
> > profile) {
> > PageViewByRegion viewByRegion = new PageViewByRegion();
> > viewByRegion.user = view.user;
> > viewByRegion.page = view.page;
> > System.out.println(" viewByRegion.user " +
> > viewByRegion.user);
> > System.out.println(" viewByRegion.page " +
> > viewByRegion.page);
> >
> > if (profile != null) {
> > viewByRegion.region = profile.region;
> > } else {
> > viewByRegion.region = "UNKNOWN";
> > }
> >
> > System.out.println(" viewByRegion.page " +
> > viewByRegion.region);
> >
> > return viewByRegion;
> > }
> > }
> > )
>


Re: KTable-KTable Join Semantics on NULL Key

2017-09-14 Thread Michael Noll
Perhaps a clarification to what Damian said:

It is shown in the (HTML) table at the link you shared [1] what happens
when you get null values for a key.

We also have slightly better join documentation at [2], the content/text of
which we are currently migrating over to the official Apache Kafka
documentation for the Streams API (under
kafka.apache.org/documentation/streams).

[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin
[2]
https://docs.confluent.io/current/streams/developer-guide.html#kstream-kstream-join


On Fri, Sep 8, 2017 at 3:19 PM, Damian Guy  wrote:

> It is shown in the table what happens when you get null values for a key.
>
> On Fri, 8 Sep 2017 at 12:31 Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi Kafka Users,
> >
> > KTable-KTable Join Semantics is explained in detailed [here][1]. But,
> > it's not clear when the input record is , some times the output
> > records are generated  and in some cases it's not.
> >
> > It will be helpful, if someone explain on how the output records are
> > generated for all the 3 types of joins on receiving a record with NULL
> > value.
> >
> > [1]: https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin
> >
> > -- Kamal
> >
>


Re: Avro Serialization & Schema Registry ..

2017-07-19 Thread Michael Noll
In short, Avro serializers/deserializers provided by Confluent always
integrate with (and thus require) Confluent Schema Registry.  That's why
you must set the `schema.registry.url` configuration for them.

If you want to use Avro but without a schema registry, you'd need to work
with the Avro API directly.  You can also implement your own "no schema
registry" Avro serializers/deserializers for more convenience, of course.

Best wishes,
Michael



On Mon, Jul 17, 2017 at 8:51 PM, Debasish Ghosh 
wrote:

> I am using the class io.confluent.kafka.serializers.KafkaAvroSerializer as
> one of the base abstractions for Avro serialization. From the stack trace I
> see that the instantiation of this class needs set up of
> KafkaAvroSerializerConfig which needs a value for the schema registry url
> ..
>
> regards.
>
> On Tue, Jul 18, 2017 at 12:02 AM, Richard L. Burton III <
> mrbur...@gmail.com>
> wrote:
>
> > For your first question, no you can use the avro API.
> >
> >
> >
> > On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh  >
> > wrote:
> >
> >> Hi -
> >>
> >> I am using Avro Serialization in a Kafka Streams application through the
> >> following dependency ..
> >>
> >> "io.confluent"  % "kafka-avro-serializer" % "3.2.2"
> >>
> >> My question is : Is schema registry mandatory for using Avro
> Serialization
> >> ? Because when I run the application I get the following exception where
> >> it
> >> complains that there is no default value for "schema.registry.url". My
> >> current settings for StreamsConfig are the following ..
> >>
> >>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >> "kstream-log-processing-avro")
> >>settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
> >>settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> Serdes.ByteArray.getClass.getName)
> >>settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> classOf[SpecificAvroSerde[LogRecordAvro]])
> >>
> >> .. and the exception ..
> >>
> >> 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator
> -
> >> User provided listener
> >> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> >> kstream-log-processing-avro failed on partition assignment
> >> org.apache.kafka.streams.errors.StreamsException: Failed to configure
> >> value
> >> serde class com.lightbend.fdp.sample.kstream.serializers.
> >> SpecificAvroSerde
> >> at org.apache.kafka.streams.StreamsConfig.valueSerde(
> >> StreamsConfig.java:594)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<
> >> init>(AbstractProcessorContext.java:58)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.(
> >> ProcessorContextImpl.java:41)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.(StreamTask.java:137)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.createStreamTask(StreamThread.java:864)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> >> createTask(StreamThread.java:1237)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$
> >> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(
> >> StreamThread.java:967)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> >> StreamThread.java:69)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> ConsumerCoordinator.java:290)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> >> pollOnce(KafkaConsumer.java:1029)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:995)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:592)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:361)
> >> Caused by: io.confluent.common.config.ConfigException: Missing required
> >> configuration "schema.registry.url" which has no default value.
> >> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
> >> at io.confluent.common.config.AbstractConfig.(
> >> AbstractConfig.java:76)
> >> at
> >> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(
> >> AbstractKafkaAvroSerDeConfig.java:51)
> >> at
> >> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(
> >> KafkaAvroSerializerConfig.java:33)

Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-06 Thread Michael Noll
Happy to hear you found a working solution, Steven!

-Michael



On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> >
> > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax 
> wrote:
> >
> > Thanks. That helps to understand the use case better.
> >
> > Rephrase to make sure I understood it correctly:
> >
> > 1) you are providing a custom partitioner to Streams that is base on one
> > field in your value (that's fine with regard to fault-tolerance :))
> > 2) you want to use interactive queries to query the store
> > 3) because of your custom partitioning schema, you need to manually
> > figure out the right application instance that hosts a key
> > 4) thus, you use a GlobalKTable to maintain the information from K to D
> > and thus to the partition ie, streams instance that hosts K
> >
> > If this is correct, than you cannot use the "by key" metadata interface.
> > It's designed to find the streams instance base in the key only -- but
> > your partitioner is based on the value. Internally, we call
> >
> >> final Integer partition = partitioner.partition(key, null,
> sourceTopicsInfo.maxPartitions);
> >
> > Note, that `value==null` -- at this point, we don't have any value
> > available and can't provide it to the partitioner.
> >
> > Thus, your approach to get all metadata is the only way you can go.
>
> Thanks for confirming this.  The code is a little ugly but I've done worse
> :)
>
> >
> >
> > Very interesting (and quite special) use case. :)
> >
> >
> > -Matthias
> >
> > On 6/2/17 2:32 PM, Steven Schlansker wrote:
> >>
> >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax 
> wrote:
> >>>
> >>> I am not sure if I understand the use case correctly. Could you give
> >>> some more context?
> >>
> >> Happily, thanks for thinking about this!
> >>
> >>>
>  backing store whose partitioning is value dependent
> >>>
> >>> In infer that you are using a custom store and not default RocksDB? If
> >>> yes, what do you use? What does "value dependent" mean in this context?
> >>
> >> We're currently using the base in memory store.  We tried to use RocksDB
> >> but the tuning to get it running appropriately in a Linux container
> without
> >> tripping the cgroups OOM killer is nontrivial.
> >>
> >>
> >>> Right now, I am wondering, why you not just set a new key to get your
> >>> data grouped by the field you are interesting in? Also, if you don't
> >>> partitioned your data by key, you might break your streams application
> >>> with regard to fault-tolerance -- or does your custom store not rely on
> >>> changelog backup for fault-tolerance?
> >>>
> >>
> >> That's an interesting point about making transformed key.  But I don't
> think
> >> it simplifies my problem too much.  Essentially, I have a list of
> messages
> >> that should get delivered to destinations.  Each message has a primary
> key K
> >> and a destination D.
> >>
> >> We partition over D so that all messages to the same destination are
> handled by
> >> the same worker, to preserve ordering and implement local rate limits
> etc.
> >>
> >> I want to preserve the illusion to the client that they can look up a
> key with
> >> only K.  So, as an intermediate step, we use the GlobalKTable to look
> up D.  Once
> >> we have K,D we can then compute the partition and execute a lookup.
> >>
> >> Transforming the key to be a composite K,D isn't helpful because the
> end user still
> >> only knows K -- D's relevance is an implementation detail I wish to
> hide -- so you still
> >> need some sort of secondary lookup.
> >>
> >> We do use the changelog backup for fault tolerance -- how would having
> the partition
> >> based on the value break this?  Is the changelog implicitly partitioned
> by a partitioner
> >> other than the one we give to the topology?
> >>
> >> Hopefully that explains my situation a bit more?  Thanks!
> >>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>  I have a KTable and backing store whose partitioning is value
> dependent.
>  I want certain groups of messages to be ordered and that grouping is
> determined
>  by one field (D) of the (possibly large) value.
> 
>  When I lookup by only K, obviously you don't know the partition it
> should be on.
>  So I will build a GlobalKTable of K -> D.  This gives me enough
> information
>  to determine the partition.
> 
>  Unfortunately, the KafkaStreams metadata API doesn't fit this use
> case well.
>  It allows you to either get all metadata, or by key -- but if you
> lookup by key
>  it just substitutes a null value (causing a downstream NPE)
> 
>  I can iterate over all metadata and compute the mapping of K -> K,D
> -> P
>  and then iterate over all metadata looking for P.  It's not difficult
> but ends
>  up being a bit of somewhat ugly code that feels like I shouldn't have
> to write it.
> 
>  Am I missing something here?  Is there a better way that I've
> m

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-30 Thread Michael Noll
Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how the
user would write a simple application that would then be augmented with the
proposed KIP changes to handle exceptions.  It should also become much
clearer then that e.g. the KIP would lead to different code paths for the
happy case and any failure scenarios.

- Do we have sufficient information available to make informed decisions on
what to do next?  For example, do we know in which part of the topology the
record failed? `ConsumerRecord` gives us access to topic, partition,
offset, timestamp, etc., but what about topology-related information (e.g.
what is the associated state store, if any)?

- Only partly on-topic for the scope of this KIP, but this is about the
bigger picture: This KIP would give users the option to send corrupted
records to dead letter queue (quarantine topic).  But, what pattern would
we advocate to process such a dead letter queue then, e.g. how to allow for
retries with backoff ("If the first record in the dead letter queue fails
again, then try the second record for the time being and go back to the
first record at a later time").  Jay and Jan already alluded to ordering
problems that will be caused by dead letter queues. As I said, retries
might be out of scope but perhaps the implications should be considered if
possible?

Also, I wrote the text below before reaching the point in the conversation
that this KIP's scope will be limited to exceptions in the category of
poison pills / deserialization errors.  But since Jay brought up user code
errors again, I decided to include it again.

snip
A meta comment: I am not sure about this split between the code for the
happy path (e.g. map/filter/... in the DSL) from the failure path (using
exception handlers).  In Scala, for example, we can do:

scala> val computation = scala.util.Try(1 / 0)
computation: scala.util.Try[Int] =
Failure(java.lang.ArithmeticException: / by zero)

scala> computation.getOrElse(42)
res2: Int = 42

Another example with Scala's pattern matching, which is similar to
`KStream#branch()`:

computation match {
  case scala.util.Success(x) => x * 5
  case scala.util.Failure(_) => 42
}

(The above isn't the most idiomatic way to handle this in Scala, but that's
not the point I'm trying to make here.)

Hence the question I'm raising here is: Do we want to have an API where you
code "the happy path", and then have a different code path for failures
(using exceptions and handlers);  or should we treat both Success and
Failure in the same way?

I think the failure/exception handling approach (as proposed in this KIP)
is well-suited for errors in the category of deserialization problems aka
poison pills, partly because the (default) serdes are defined through
configuration (explicit serdes however are defined through API calls).

However, I'm not yet convinced that the failure/exception handling approach
is the best idea for user code exceptions, e.g. if you fail to guard
against NPE in your lambdas or divide a number by zero.

scala> val stream = Seq(1, 2, 3, 4, 5)
stream: Seq[Int] = List(1, 2, 3, 4, 5)

// Here: Fallback to a sane default when encountering failed records
scala> stream.map(x => Try(1/(3 - x))).flatMap(t =>
Seq(t.getOrElse(42)))
res19: Seq[Int] = List(0, 1, 42, -1, 0)

// Here: Skip over failed records
scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s) => s }
res20: Seq[Int] = List(0, 1, -1, 0)

The above is more natural to me than using error handlers to define how to
deal with failed records (here, the value `3` causes an arithmetic
exception).  Again, it might help the KIP if we added an end-to-end example
for such user code errors.
snip




On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak 
wrote:

> Hi Jay,
>
> Eno mentioned that he will narrow down the scope to only ConsumerRecord
> deserialisation.
>
> I am working with Database Changelogs only. I would really not like to see
> a dead letter queue or something
> similliar. how am I expected to get these back in order. Just grind to
> hold an call me on the weekend. I'll fix it
> then in a few minutes rather spend 2 weeks ordering dead letters. (where
> reprocessing might be even the faster fix)
>
> Best Jan
>
>
>
>
> On 29.05.2017 20:23, Jay Kreps wrote:
>
>> - I think we should hold off on retries unless we have worked out the
>> full usage pattern, people can always implement their own. I think
>> the idea
>> is that you send the message to some kind of dead letter queue and
>> then
>> replay these later. This obviously destroys all semantic guarantees
>> we are
>> working hard to provide right now, which may be okay.
>>
>
>


Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

2017-04-28 Thread Michael Noll
To add to what Eno said:

You can of course use the Kafka Streams API to build an application that
consumes from multiple Kafka topics.  But, going back to your original
question, the scalability of Kafka and the Kafka Streams API is based on
partitions, not on topics.

-Michael




On Fri, Apr 28, 2017 at 6:28 PM, Eno Thereska 
wrote:

> Hi Henry,
>
> Kafka Streams scales differently and does not support having the same
> application ID subscribe to different topics for scale-out. The way we
> support scaling out if you want to use the same application id is through
> partitions, i.e., Kafka Streams automatically assigns partitions to your
> multiple instances. If you want to scale out using topics you'll need to
> use different application IDs.
>
> So in a nutshell this pattern is not supported. Was there a reason you
> needed to do it like that?
>
> Thanks
> Eno
>
> > On 28 Apr 2017, at 11:41, Henry Thacker  wrote:
> >
> > Should also add - there are definitely live incoming messages on both
> input
> > topics when my streams are running. The auto offset reset config is set
> to
> > "earliest" and because the input data streams are quite large (several
> > millions records each), I set a relatively small max poll records (200)
> so
> > we don't run into heartbeating issues if we restart intraday.
> >
> > Thanks,
> > Henry
> >
> > --
> > Henry Thacker
> >
> > On 28 April 2017 at 11:37:53, Henry Thacker (he...@henrythacker.com)
> wrote:
> >
> >> Hi Eno,
> >>
> >> Thanks for your reply - the code that builds the topology is something
> >> like this (I don't have email and the code access on the same machine
> >> unfortunately - so might not be 100% accurate / terribly formatted!).
> >>
> >> The stream application is a simple verifier which stores a tiny bit of
> >> state in a state store. The processor is custom and only has logic in
> >> init() to store the context and retrieve the store and process(...) to
> >> validate the incoming messages and forward these on when appropriate.
> >>
> >> There is no joining, aggregates or windowing.
> >>
> >> In public static void main:
> >>
> >> String topic = args[0];
> >> String output = args[1];
> >>
> >> KStreamBuilder builder = new KStreamBuilder();
> >>
> >> StateStoreSupplier stateStore =
> >> Stores.create("mystore").withStringKeys().withByteArrayValues().
> persistent().build();
> >>
> >> KStream stream = builder.stream(topic);
> >>
> >> builder.addStateStore(stateStore);
> >>
> >> stream.process(this::buildStreamProcessor, "mystore");
> >>
> >> stream.to(outputTopic);
> >>
> >> KafkaStreams streams = new KafkaStreams(builder, getProps());
> >> streams.setUncaughtExceptionHandler(...);
> >> streams.start();
> >>
> >> Thanks,
> >> Henry
> >>
> >>
> >> On 28 April 2017 at 11:26:07, Eno Thereska (eno.there...@gmail.com)
> wrote:
> >>
> >>> Hi Henry,
> >>>
> >>> Could you share the code that builds your topology so we see how the
> >>> topics are passed in? Also, this would depend on what the streaming
> logic
> >>> is doing with the topics, e.g., if you're joining them then both
> partitions
> >>> need to be consumed by the same instance.
> >>>
> >>> Eno
> >>>
> >>> On 28 Apr 2017, at 11:01, Henry Thacker 
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
> >>> processes, Consumer 1 and 2. They both share the same application ID,
> but
> >>> subscribe for different single-partition topics. Only one stream
> consumer
> >>> receives messages.
> >>>
> >>> The non working stream consumer just sits there logging:
> >>>
> >>> Starting stream thread [StreamThread-1]
> >>> Discovered coordinator  (Id: ...) for group my-streamer
> >>> Revoking previously assigned partitions [] for group my-streamer
> >>> (Re-)joining group my-streamer
> >>> Successfully joined group my-streamer with generation 3
> >>> Setting newly assigned partitions [] for group my-streamer
> >>> (Re-)joining group my-streamer
> >>> Successfully joined group my-streamer with generation 4
> >>>
> >>> If I was trying to subscribe to the same topic & partition I could
> >>> understand this behaviour, but given that the subscriptions are for
> >>> different input topics, I would have thought this should work?
> >>>
> >>> Thanks,
> >>> Henry
> >>>
> >>> --
> >>> Henry Thacker
> >>>
> >>>
> >>>
>
>


Re: [ANNOUNCE] New committer: Rajini Sivaram

2017-04-24 Thread Michael Noll
Congratulations, Rajini!

On Mon, Apr 24, 2017 at 11:50 PM, Ismael Juma  wrote:

> Congrats Rajini! Well-deserved. :)
>
> Ismael
>
> On Mon, Apr 24, 2017 at 10:06 PM, Gwen Shapira  wrote:
>
> > The PMC for Apache Kafka has invited Rajini Sivaram as a committer and we
> > are pleased to announce that she has accepted!
> >
> > Rajini contributed 83 patches, 8 KIPs (all security and quota
> > improvements) and a significant number of reviews. She is also on the
> > conference committee for Kafka Summit, where she helped select content
> > for our community event. Through her contributions she's shown good
> > judgement, good coding skills, willingness to work with the community on
> > finding the best
> > solutions and very consistent follow through on her work.
> >
> > Thank you for your contributions, Rajini! Looking forward to many more :)
> >
> > Gwen, for the Apache Kafka PMC
> >
>


Re: Joining on non-keyed values - how to lookup fields

2017-04-20 Thread Michael Noll
Jon,

the recently introduced GlobalKTable ("global tables") allow you to perform
non-key lookups.
See
http://docs.confluent.io/current/streams/developer-guide.html#kstream-globalktable-join
(and the javadocs link)

> So called "internal" values can't be looked up.

If I understand you correctly:  GlobalKTables allow you to do that.  You
can provide a KeyValueMapper with which you can tell Kafka Streams against
which on-the-fly-computed "new key" the global-table lookup should be
performed.  For example, if your GlobalKTable has String keys and JSON
values, you can perform lookups against particular fields in the JSON
payload.

-Michael


On Thu, Apr 20, 2017 at 2:20 PM, Jon Yeargers 
wrote:

> Id like to further my immersion in kafka-as-database by doing more
> extensive key/val joins. Specifically there are many instances in the DB
> world where one is given a numeric field and needs to lookup the
> appropriate string translation / value. Imagine a record of student/class
> data where al the courses are numbered and one must determine class /
> instructor names for a hard copy.
>
> Something akin to
>
> select  from schedules
>left join classes on schedules.classid = classes.id
>left join teachers on schedules.teacherid = teachers.id
>left join textbooks on schedules.textbookid = textbooks.id
>
> ... and so on.
>
> In the KTable world (AFIACT) this is only possible for the key the source
> record uses. So called "internal" values can't be looked up. I could
> imagine running each record through a 'map' cycle to rearrange the key for
> each lookup column, remap and repeat but this seems a bit onerous. Perhaps
> using a Process step one could use additional streams? Dunno.
>
> Using log-compaction these 'matching/lookup' topics could be kept
> available.
>
> Was reading this missive (
> https://cwiki.apache.org/confluence/display/KAFKA/
> Discussion%3A+Non-key+KTable-KTable+Joins).
> Seems like the right direction but misses this point.
>
> Any thoughts on this? Am I missing an obvious solution? (I hope so - this
> would be a cool use case)
>


Re: Kafka Streams: a back-pressure question for windowed streams

2017-04-20 Thread Michael Noll
Hi there!

In short, Kafka Streams ensures that your application consumes only as much
data (or: as fast) as it can process it.

The main "problem" you might encounter is not that you run into issues with
state stores (like in-memory stores or RocksDB stores), but -- which is a
more general issue -- that your application's *current capacity* (e.g. "I
am running (only) 10 instances of my app") doesn't allow it process the
data as fast as the data is coming in.  As you mentioned, you'd need to
monitor your application (Kafka/Streams exposes several such metrics) and,
if needed, launch additional instances of your application ("Ok, incoming
data load increased by 200%, so it looks like I need to run 10 + 20 = 30
instances of my application").  What you'd typically monitor is the
so-called "consumer lag" of your application.  In a nutshell, if the
consumer lag is 0 (zero), then your app processes the data as fast as it is
arriving.  If the consumer lag is "large" or, more importantly, growing
over time, then this means your application is not capable any longer to
keep up with the incoming data load.

FWIW, we provide guidance on how to properly size and plan the capacity of
your application:
http://docs.confluent.io/current/streams/sizing.html

Hope this helps,
Michael







On Thu, Apr 20, 2017 at 3:46 PM, Tianji Li  wrote:

> Hi there,
>
> I have a doubt regarding how to realize 'back-pressure' for windowed
> streams.
>
> Say I have a pipeline that consumes from a topic on a windowed basis, then
> do some processing (whenever punctuate is called), and produces into
> another topic.
>
> If the incoming rates from all consumers is 10M/second, and the processing
> rate of all the punctuates is something like 5M/second, then two things can
> happen:
>
> - If a in-memory store is used, on-heap memory will be drained gradually
> and finally GC kicks in which leads to unnecessary rebalancing + other
> things.
>
> - If off-heap (RocksDB) is used, then over time, punctuate() will take
> longer and longer time, and finally performance will be terrible +
> something else that I do not know yet.
>
> I understand the reason of these behaviors is that kafka Streams does
> back-pressure by checking consumers buffer sizes, and StreamThread's buffer
> size, but does NOT check state stores.
>
> I think a solution for this is to 'add more Kafka Streams instance'. By
> this I mean maybe today I need a processing rate of 1M/second, and tomorrow
> I need 5M/second. Then a mechanism is needed for Kafka Streams to detect
> this, and inform people who can add new instances either manually or better
> automatically. And while waiting for people to react, the current running
> Kafka Streams applications should not crash but can slow down a little bit
> (by checking the state stores conditions, say number of records cached, or
> total time taken for previous punctuates??).
>
> Am I understanding correctly?
>
> Thanks
> Tianji
>


Re: auto.offset.reset for Kafka streams 0.10.2.0

2017-04-11 Thread Michael Noll
It's also documented at
http://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters
.

FYI: We have already begun syncing the Confluent docs for Streams into the
Apache Kafka docs for Streams, but there's still quite some work left
(volunteers are welcome :-P).

-Michael


On Tue, Apr 11, 2017 at 8:37 AM, Matthias J. Sax 
wrote:

> Default for Streams is "earliest"
>
> cf.
> https://github.com/apache/kafka/blob/0.10.2.0/streams/
> src/main/java/org/apache/kafka/streams/StreamsConfig.java#L405
>
>
> -Matthias
>
> On 4/10/17 9:41 PM, Mahendra Kariya wrote:
> > This was even my assumption. But I had to explicitly specify
> > auto.offset.reset=latest. Without this config, it started from
> "earliest"!
> >
> > On Tue, Apr 11, 2017 at 10:07 AM, Sachin Mittal 
> wrote:
> >
> >> As far as I know default is latest, if no offsets are found. Otherwise
> it
> >> starts from the offset.
> >>
> >>
> >> On Tue, Apr 11, 2017 at 8:51 AM, Mahendra Kariya <
> >> mahendra.kar...@go-jek.com
> >>> wrote:
> >>
> >>> Hey All,
> >>>
> >>> Is the auto offset reset set to "earliest" by default in Kafka streams
> >>> 0.10.2.0? I thought default was "latest".
> >>>
> >>> I started a new Kafka streams application with a fresh application id
> and
> >>> it started consuming messages from the beginning.
> >>>
> >>
> >
>
>


Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Sachin,

there's a JIRA that seems related to what you're seeing:
https://issues.apache.org/jira/browse/KAFKA-4740

Perhaps you could check the above and report back?

-Michael




On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll  wrote:

> Hmm, I re-read the stacktrace again. It does look like the value-side
> being the culprit (as Sachin suggested earlier).
>
> -Michael
>
>
> On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> wrote:
>
>> Sachin,
>>
>> you have this line:
>>
>> > builder.stream(Serdes.String(), serde, "advice-stream")
>>
>> Could the problem be that not the record values are causing the problem
>> -- because your value deserializer does try-catch any such errors -- but
>> that the record *keys* are malformed?  The built-in `Serdes.String()` does
>> not try-catch deserialization errors, and from a quick look at the source
>> it seems that the `Fetcher` class (clients/src/main/java/org/apa
>> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
>> error above ("Error deserializing key/value for partition..."), and the
>> Fetcher is swallowing the more specific SerializationException of
>> `String.Serdes()` (but it will include the original exception/Throwable in
>> its own SerializationException).
>>
>> -Michael
>>
>>
>>
>> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
>> wrote:
>>
>>> My streams application does run in debug mode only.
>>> Also I have checked the code around these lines
>>>
>>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>>> ord(Fetcher.java:867)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>
>>> I don't see any log statement which will give me more information.
>>>
>>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>>
>>> The issue is happening at this line and perhaps handling the exception
>>> and
>>> setting the value to be null may be better options.
>>> Yes at client side nothing can be done because exception is happening
>>> before this.valueDeserializer.deserialize can be called.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
>>> wrote:
>>>
>>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>>> message
>>> > has already been received into Streams.
>>> > You could create a simple app that uses the Consumer, seeks to the
>>> offset,
>>> > and tries to read the message. If you did this in debug mode you might
>>> find
>>> > out some more information.
>>> >
>>> >
>>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
>>> >
>>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>>> it
>>> > > fails with same error.
>>> > >
>>> > > So was wondering if I can apply any of the suggestion as per
>>> > >
>>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>>> > >
>>> > > If there is any other was just to get the contents of that message it
>>> > would
>>> > > be helpful.
>>> > >
>>> > > Thanks
>>> > > Sachin
>>> > >
>>> > >
>>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
>>> > wrote:
>>> > >
>>> > > > Hi Sachin,
>>> > > >
>>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>>> > offset
>>> > > > on the topic and seeing what the message is? Might be easier to
>>> debug?
>>> > > Like
>>> > > > you say, it is failing in the consumer.
>>> > > > Thanks,
>>> > > > Damian
>>> > > >
>>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
>>> wrote:
>>> > > >
>>> > > > > I think I am following the third optio

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Hmm, I re-read the stacktrace again. It does look like the value-side being
the culprit (as Sachin suggested earlier).

-Michael


On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll  wrote:

> Sachin,
>
> you have this line:
>
> > builder.stream(Serdes.String(), serde, "advice-stream")
>
> Could the problem be that not the record values are causing the problem --
> because your value deserializer does try-catch any such errors -- but that
> the record *keys* are malformed?  The built-in `Serdes.String()` does not
> try-catch deserialization errors, and from a quick look at the source it
> seems that the `Fetcher` class (clients/src/main/java/org/
> apache/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> error above ("Error deserializing key/value for partition..."), and the
> Fetcher is swallowing the more specific SerializationException of
> `String.Serdes()` (but it will include the original exception/Throwable in
> its own SerializationException).
>
> -Michael
>
>
>
> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal  wrote:
>
>> My streams application does run in debug mode only.
>> Also I have checked the code around these lines
>>
>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>> ord(Fetcher.java:867)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>
>> I don't see any log statement which will give me more information.
>>
>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>
>> The issue is happening at this line and perhaps handling the exception and
>> setting the value to be null may be better options.
>> Yes at client side nothing can be done because exception is happening
>> before this.valueDeserializer.deserialize can be called.
>>
>> Thanks
>> Sachin
>>
>>
>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy  wrote:
>>
>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>> message
>> > has already been received into Streams.
>> > You could create a simple app that uses the Consumer, seeks to the
>> offset,
>> > and tries to read the message. If you did this in debug mode you might
>> find
>> > out some more information.
>> >
>> >
>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
>> >
>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>> it
>> > > fails with same error.
>> > >
>> > > So was wondering if I can apply any of the suggestion as per
>> > >
>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>> > >
>> > > If there is any other was just to get the contents of that message it
>> > would
>> > > be helpful.
>> > >
>> > > Thanks
>> > > Sachin
>> > >
>> > >
>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
>> > wrote:
>> > >
>> > > > Hi Sachin,
>> > > >
>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>> > offset
>> > > > on the topic and seeing what the message is? Might be easier to
>> debug?
>> > > Like
>> > > > you say, it is failing in the consumer.
>> > > > Thanks,
>> > > > Damian
>> > > >
>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
>> wrote:
>> > > >
>> > > > > I think I am following the third option.
>> > > > >
>> > > > > My pipeline is:
>> > > > >
>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>> > > > >
>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>> > > > >   .filter(new Predicate() { ...})
>> > > > >   .groupByKey()
>> > > > >   .aggregate(new Initializer() {...}, new Aggregator> V,
>> > > V1>()
>> > > > > {...}, windows, supplier)
>> > > > >   .mapValues(new ValueMapper() { ... })
>> > > > >   .foreach(new ForeachAction, V2>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Sachin,

you have this line:

> builder.stream(Serdes.String(), serde, "advice-stream")

Could the problem be that not the record values are causing the problem --
because your value deserializer does try-catch any such errors -- but that
the record *keys* are malformed?  The built-in `Serdes.String()` does not
try-catch deserialization errors, and from a quick look at the source it
seems that the `Fetcher` class
(clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java)
is throwing your error above ("Error deserializing key/value for
partition..."), and the Fetcher is swallowing the more specific
SerializationException of `String.Serdes()` (but it will include the
original exception/Throwable in its own SerializationException).

-Michael



On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal  wrote:

> My streams application does run in debug mode only.
> Also I have checked the code around these lines
>
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>
> I don't see any log statement which will give me more information.
>
> https://github.com/apache/kafka/blob/0.10.2/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>
> The issue is happening at this line and perhaps handling the exception and
> setting the value to be null may be better options.
> Yes at client side nothing can be done because exception is happening
> before this.valueDeserializer.deserialize can be called.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy  wrote:
>
> > The suggestions in that FAQ won't help as it is too late, i.e., the
> message
> > has already been received into Streams.
> > You could create a simple app that uses the Consumer, seeks to the
> offset,
> > and tries to read the message. If you did this in debug mode you might
> find
> > out some more information.
> >
> >
> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
> >
> > > Well I try to read that offset via kafka-console-consumer.sh too and it
> > > fails with same error.
> > >
> > > So was wondering if I can apply any of the suggestion as per
> > >
> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >
> > > If there is any other was just to get the contents of that message it
> > would
> > > be helpful.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> > wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Have you tried firing up a consumer (non-streams), seeking to that
> > offset
> > > > on the topic and seeing what the message is? Might be easier to
> debug?
> > > Like
> > > > you say, it is failing in the consumer.
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
> wrote:
> > > >
> > > > > I think I am following the third option.
> > > > >
> > > > > My pipeline is:
> > > > >
> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > > > >
> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > > >   .filter(new Predicate() { ...})
> > > > >   .groupByKey()
> > > > >   .aggregate(new Initializer() {...}, new Aggregator > > V1>()
> > > > > {...}, windows, supplier)
> > > > >   .mapValues(new ValueMapper() { ... })
> > > > >   .foreach(new ForeachAction, V2>() {... });
> > > > >
> > > > >
> > > > > and In VDeserializer (implements Deserializer) I am doing
> > something
> > > > like
> > > > > this:
> > > > >
> > > > > public V deserialize(String paramString, byte[]
> > paramArrayOfByte) {
> > > > > if (paramArrayOfByte == null) { return null;}
> > > > > V data = null;
> > > > > try {
> > > > > data = objectMapper.readValue(paramArrayOfByte, new
> > > > > TypeReference() {});
> > &

Re: Understanding ReadOnlyWindowStore.fetch

2017-03-30 Thread Michael Noll
Jon,

perhaps you could share the full integration test (or whatever code you're
using to experiment)?  We had a similar "how does X work?" question on
StackOverflow recently [1], and it was much easier to help once we e.g.
also understood how the test data was exactly being generated.

-Michael




[1]
https://stackoverflow.com/questions/43038653/how-to-actually-discard-late-records


On Thu, Mar 30, 2017 at 1:53 AM, Jon Yeargers 
wrote:

> I remain more than mystified about the workings of the StateStore. I tried
> making aggregations with a 1minute window, 10 second advance and a _12
> hour_ retention (which is longer than the retention.ms of the topic).  I
> still couldn't get more than a 15% hit rate on the StateStore.
>
> Are there configuration settings? Some properties file to setup RocksDB? Im
> not getting any errors - just not getting any data.
>
> On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers 
> wrote:
>
> > So '.until()' is based on clock time / elapsed time (IE record age) /
> > something else?
> >
> > The fact that Im seeing lots of records come through that can't be found
> > in the Store - these are 'old' and already expired?
> >
> > Going forward - it would be useful to have different forms of '.until()'
> > so one could consume old records (EG if one was catching up from lag)
> > without having to worry about them immediately disappearing.
> >
> > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy 
> wrote:
> >
> >> Jon,
> >>
> >> You should be able to query anything that has not expired, i.e., based
> on
> >> TimeWindows.until(..).
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers 
> >> wrote:
> >>
> >> > To be a bit more specific:
> >> >
> >> > If I call this: KTable kt =
> >> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >> >
> >> > and then call this:
> >> >
> >> > kt.forEach()-> ...
> >> >
> >> > Can I assume that everything that comes out will be available in
> >> > "somekeystore"? If not, what subset should I expect to find there?
> >> >
> >> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <
> jon.yearg...@cedexis.com
> >> >
> >> > wrote:
> >> >
> >> > > But if a key shows up in a KTable->forEach should it be available in
> >> the
> >> > > StateStore (from the KTable)?
> >> > >
> >> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll  >
> >> > > wrote:
> >> > >
> >> > >> Jon,
> >> > >>
> >> > >> there's a related example, using a window store and a key-value
> >> store,
> >> > at
> >> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> > >> streams/src/test/java/io/confluent/examples/streams/Val
> >> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> >> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> >> > >>
> >> > >> -Michael
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> >> jon.yearg...@cedexis.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Im only running one instance (locally) to keep things simple.
> >> > >> >
> >> > >> > Reduction:
> >> > >> >
> >> > >> > KTable, String> hourAggStore =
> >> > >> > sourceStream.groupByKey().reduce(rowReducer,
> >> > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60
> *
> >> > >> > 1000).until(70 * 60 * 1000L),
> >> > >> > "HourAggStore");
> >> > >> >
> >> > >> > then I get values to look for via:
> >> > >> >
> >> > >> > hourAggStore.foreach((k, v) -> {
> >> > >> > LogLine logLine = objectMapper.readValue(v,
> >> > >> logLine.class);
> >> > >> > LOGGER.debug("{}", k.key());
> >> > >> > });
> >> > >> >
> >> > >>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Could this be a corrupted message ("poison pill") in your topic?

If so, take a look at
http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

FYI: We're currently investigating a more elegant way to address such
poison pill problems.  If you have feedback on that front, feel free to
share it with us. :-)

-Michael




On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal  wrote:

> Hi,
> This is for first time we are getting a weird exception.
> After this the streams caches.
>
> Only work around is to manually seek and commit offset to a greater number
> and we are needing this manual intervention again and again.
>
> Any idea what is causing it and how can we circumvent this.
>
> Note this error happens in both cases when 10.2 client or 10.1.1 client
> connect to kafka server 10.1.1
>
> So this does not looks like version issue.
>
> Also we have following setting
> message.max.bytes=513
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>
> Rest is all default and also increasing the value for
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
>
> Stack trace below.
>
> Thanks
> Sachin
>
>
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition advice-stream-6 at offset 45153795
> java.lang.IllegalArgumentException: null
>   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> SNAPSHOT.jar:na]
>


Re: ThoughWorks Tech Radar: Assess Kafka Streams

2017-03-30 Thread Michael Noll
Aye!  Thanks for sharing, Jan. :-)

On Wed, Mar 29, 2017 at 8:56 PM, Eno Thereska 
wrote:

> Thanks for the heads up Jan!
>
> Eno
>
> > On 29 Mar 2017, at 19:08, Jan Filipiak  wrote:
> >
> > Regardless of how usefull you find the tech radar.
> >
> > Well deserved! even though we all here agree that trial or adopt is in
> reach
> >
> > https://www.thoughtworks.com/radar/platforms/kafka-streams
> >
> > Best Jan
> >
> >
>
>


Re: Using Kafka Stream in the cluster of kafka on one or multiple docker-machine/s

2017-03-30 Thread Michael Noll
If you want to deploy a Kafka Streams application, then essentially you
only need the (fat) jar of your application and a JRE in your container.
In other words, it's the same setup you'd use to deploy *any* kind of Java
application.

You do not need to containerize "Kafka", which I assume you meant as
containerizing "a Kafka cluster or broker".

> And after I have the container running, in my container should I run Java
-cp ... same as
> https://github.com/confluentinc/examples/blob/3.
2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62?

Yes.


-Michael



On Thu, Mar 30, 2017 at 4:23 AM, Mina Aslani  wrote:

> Hi,
>
> Do we have an example of a container with an instance of the jar file by
> any chance? I am wondering if I should have a container of headless java or
> should I have a container of Kafka?
>
> And after I have the container running, in my container should I run Java
> -cp ... same as https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L55-L62?
>
> Regards,
> Mina
>
> On Tue, Mar 21, 2017 at 4:49 PM, Mina Aslani  wrote:
>
> > Hi Michael,
> >
> > Thank you very much for the prompt response, really appreciate it!
> >
> > From https://github.com/confluentinc/examples/blob/3.2.x/
> > kafka-streams/src/main/java/io/confluent/examples/streams/
> > WordCountLambdaExample.java#L55-L62 and
> > https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> > streams#packaging-and-running I missed the fact that the jar should be
> > run in a separate container.
> >
> > Best regards,
> > Mina
> >
> > On Tue, Mar 21, 2017 at 4:34 PM, Michael Noll 
> > wrote:
> >
> >> Typically you'd containerize your app and then launch e.g. 10 containers
> >> if
> >> you need to run 10 instances of your app.
> >>
> >> Also, what do you mean by "in a cluster of Kafka containers" and "in the
> >> cluster of Kafkas"?
> >>
> >> On Tue, Mar 21, 2017 at 9:08 PM, Mina Aslani 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I am trying to understand how I can use a kafka stream app(jar file)
> in
> >> a
> >> > cluster of kafka containers.
> >> >
> >> > Kafka does not have master/slave concept (unlike spark), how I should
> >> run
> >> > my app in the cluster of kafkas (e.g. on one or multiple
> >> docker-machine/s)?
> >> >
> >> > I use below command line when having one VM/node with one kafka
> >> container
> >> > https://github.com/confluentinc/examples/blob/3.
> >> > 2.x/kafka-streams/src/main/
> >> > java/io/confluent/examples/streams/WordCountLambdaExample.java#
> L55-L62
> >> >
> >> > Best regards,
> >> > Mina
> >> >
> >>
> >
> >
>


Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Michael Noll
Jon,

there's a related example, using a window store and a key-value store, at
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
(this is for Confluent 3.2 / Kafka 0.10.2).

-Michael



On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers 
wrote:

> Im only running one instance (locally) to keep things simple.
>
> Reduction:
>
> KTable, String> hourAggStore =
> sourceStream.groupByKey().reduce(rowReducer,
> TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> 1000).until(70 * 60 * 1000L),
> "HourAggStore");
>
> then I get values to look for via:
>
> hourAggStore.foreach((k, v) -> {
> LogLine logLine = objectMapper.readValue(v, logLine.class);
> LOGGER.debug("{}", k.key());
> });
>
> Ive kept it easy by requesting everything from 0 to
> 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> sample code "windowedByKey".
>
> Requests are sent in via curl and output through the same channel. I pass
> in the key and ask for any values.
>
> Ive looked at the values passed in / out of the reduction function and they
> look sane.
>
> My assumption is that if a value shows up in the 'forEach' loop this
> implies it exists in the StateStore. Accurate?
>
> In fact, only about one in 10 requests actually return any values. No
> errors - just no data.
>
>
>
> On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > If you are able to get a handle on the store, i.e., via
> > KafkaStreams.store(...) and call fetch without any exceptions, then the
> > store is available.
> > The time params to fetch are the boundaries to search for windows for the
> > given key. They relate to the start time of the window, so if you did
> > fetch(key, t1, t2) - it will find all the windows for key that start in
> the
> > inclusive time range t1 - t2.
> >
> > Are you running more than one instance? If yes, then you want to make
> sure
> > that you are querying the correct instance. For that you can use:
> > KafkaStreams.metadataForKey(...) to find the instance that has the key
> you
> > are looking for.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers 
> > wrote:
> >
> > > Im probing about trying to find a way to solve my aggregation -> db
> > issue.
> > > Looking at the '.fetch()'  function Im wondering about the 'timeFrom'
> and
> > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > >
> > > The test in
> > >
> > > https://github.com/confluentinc/examples/blob/
> > master/kafka-streams/src/test/java/io/confluent/examples/
> > streams/interactivequeries/WordCountInteractiveQueriesExa
> > mpleTest.java#L200-L212
> > > makes it appear that the params are boundaries and that it will return
> an
> > > inclusive list of every key/window combination. Truth?
> > >
> > > My tests to this end haven't returned anything.
> > >
> > > Im watching the values coming out of the KTable so I
> can
> > > send them back as request params. What Ive tried:
> > >
> > > - Window.key(), Window.key().start() and Window.key().end()
> > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> > > - Window.key(), 0 and Window.key().end()
> > > - Window.key(), 0 and (Window.key().end() + 1)
> > >
> > > None of these seem to hit anything in the StateStore.
> > >
> > > Is there a delay before Store values become available for '.fetch()'?
> > >
> >
>


Re: Custom stream processor not triggering #punctuate()

2017-03-28 Thread Michael Noll
Elliot,

in the current API, `punctuate()` is called based on the current
stream-time (which defaults to event-time), not based on the current
wall-clock time / processing-time.  See http://docs.confluent.io/
current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
advanced only when new input records are coming in, so if there's e.g. a
stall on incoming records, then `punctuate()` will not be called.

If you need to schedule a call every N minutes of wall-clock time you'd
need to use your own scheduler.

Does that help?
Michael



On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
elliot.crosby-mccullo...@freeagent.com> wrote:

> Hi there,
>
> I've written a simple processor which expects to have #process called on it
> for each message and configures regular punctuate calls via
> `context.schedule`.
>
> Regardless of what configuration I try for timestamp extraction I cannot
> get #punctuate to be called, despite #process being called for every
> message (which are being sent several seconds apart).  I've set the
> schedule as low as 1 (though the docs aren't clear whether that's micro,
> milli, or just seconds) and tried both the wallclock time extractor and the
> default time extractor in both the global config and the state store serde.
>
> These particular messages are being generated by another kafka streams DSL
> application and I'm using kafka 0.10.2.0, so presumably they also have
> automatically embedded timestamps.
>
> I can't for the life of me figure out what's going on.  Could you clue me
> in?
>
> Thanks,
> Elliot
>


Re: APPLICATION_SERVER_CONFIG ?

2017-03-27 Thread Michael Noll
Yes, agreed -- the re-thinking pre-existing notions is a big part of such
conversations.  A bit like making the mental switch from object-oriented
programming to functional programming -- and, just like in this case,
neither is more "right" than the other.  Personal
opinion/preference/context matters a lot, hence I tried to carefully phrase
my answer in a way that it doesn't come across as potentially
indoctrinating. ;-)

On Fri, Mar 24, 2017 at 6:34 PM, Jon Yeargers 
wrote:

> You make some great cases for your architecture. To be clear - Ive been
> proselytizing for kafka since I joined this company last year. I think my
> largest issue is rethinking some preexisting notions about streaming to
> make them work in the kstream universe.
>
> On Fri, Mar 24, 2017 at 6:07 AM, Michael Noll 
> wrote:
>
> > > If I understand this correctly: assuming I have a simple aggregator
> > > distributed across n-docker instances each instance will _also_ need to
> > > support some sort of communications process for allowing access to its
> > > statestore (last param from KStream.groupby.aggregate).
> >
> > Yes.
> >
> > See
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#your-application-and-interactive-queries
> > .
> >
> > > - The tombstoning facilities of redis or C* would lend themselves well
> to
> > > implementing a 'true' rolling aggregation
> >
> > What is a 'true' rolling aggregation, and how could Redis or C* help with
> > that in a way that Kafka can't?  (Honest question.)
> >
> >
> > > I get that RocksDB has a small footprint but given the choice of
> > > implementing my own RPC / gossip-like process for data sharing and
> using
> > a
> > > well tested one (ala C* or redis) I would almost always opt for the
> > latter.
> > > [...]
> > > Just my $0.02. I would love to hear why Im missing the 'big picture'.
> The
> > > kstreams architecture seems rife with potential.
> >
> > One question is, for example:  can the remote/central DB of your choice
> > (Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
> > Over the network?  At the same low latency?  Also, what happens if the
> > remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
> > fact that your app's processing latency will now go through the roof?  I
> > wrote about some such scenarios at
> > https://www.confluent.io/blog/distributed-real-time-joins-
> > and-aggregations-on-user-activity-events-using-kafka-streams/
> > .
> >
> > One big advantage (for many use cases, not for) with Kafka/Kafka Streams
> is
> > that you can leverage fault-tolerant *local* state that may also be
> > distributed across app instances.  Local state is much more efficient and
> > faster when doing stateful processing such as joins or aggregations.  You
> > don't need to worry about an external system, whether it's up and
> running,
> > whether its version is still compatible with your app, whether it can
> scale
> > as much as your app/Kafka Streams/Kafka/the volume of your input data.
> >
> > Also, note that some users have actually opted to run hybrid setups:
> Some
> > processing output is sent to a remote data store like Cassandra (e.g. via
> > Kafka Connect), some processing output is exposed directly through
> > interactive queries.  It's not like your forced to pick only one
> approach.
> >
> >
> > > - Typical microservices would separate storing / retrieving data
> >
> > I'd rather argue that for microservices you'd oftentimes prefer to *not*
> > use a remote DB, and rather do everything inside your microservice
> whatever
> > the microservice needs to do (perhaps we could relax this to "do
> everything
> > in a way that your microservices is in full, exclusive control", i.e. it
> > doesn't necessarily need to be *inside*, but arguably it would be better
> if
> > it actually is).
> > See e.g. the article
> > https://www.confluent.io/blog/data-dichotomy-rethinking-the-
> > way-we-treat-data-and-services/
> > that lists some of the reasoning behind this school of thinking.  Again,
> > YMMV.
> >
> > Personally, I think there's no simple true/false here.  The decisions
> > depend on what you need, what your context is, etc.  Anyways, since you
> > already have some opinions for the one side, I wanted to share some food
> > for thought for the other side of the argument. :-)
> >
> > Best,
> &g

Re: using a state store for deduplication

2017-03-27 Thread Michael Noll
Jon,

Damian already answered your direct question, so my comment is a FYI:

There's a demo example at
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
(this is for Confluent 3.2 / Kafka 0.10.2.0).

Note that this code is for demonstration purposes.  To make the example
more suitable to production use cases you could e.g. switch to a window
store instead of manually purging expired entries via
`ReadOnlyKeyValueStore#all()` (which might be an expensive
operation/iteration).

Hope this helps,
Michael




On Mon, Mar 27, 2017 at 3:07 PM, Damian Guy  wrote:

> Jon,
> You don't need all the data for every topic as the data is partitioned by
> key. Therefore each state-store instance is de-duplicating a subset of the
> key set.
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 13:47 Jon Yeargers 
> wrote:
>
> > Ive been (re)reading this document(
> > http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores
> )
> > hoping to better understand StateStores. At the top of the section there
> is
> > a tantalizing note implying that one could do deduplication using a
> store.
> >
> > At present we using Redis for this as it gives us a shared location. Ive
> > been of the mind that a given store was local to a streams instance. To
> > truly support deduplication I would think one would need access to _all_
> > the data for a topic and not just on a per-partition basis.
> >
> > Am I completely misunderstanding this?
> >
>


Re: YASSQ (yet another state store question)

2017-03-27 Thread Michael Noll
IIRC this may happen, for example, if the first host runs all the stream
tasks (here: 2 in total) and migration of stream task(s) to the second host
hasn't happened yet.

-Michael



On Sun, Mar 26, 2017 at 3:14 PM, Jon Yeargers 
wrote:

> Also - if I run this on two hosts - what does it imply if the response to
> 'streams.allMetadata()' from one host includes both instances but the other
> host only knows about itself?
>
> On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers 
> wrote:
>
> > If the '.state()' function returns "RUNNING" and I still get this
> > exception?
> >
> > On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Jon,
> >>
> >> This is expected, see this: https://groups.google.com/foru
> >> m/?pli=1#!searchin/confluent-platform/migrated$20to$
> >> 20another$20instance%7Csort:relevance/confluent-platform/
> >> LglWC_dZDKw/qsPuCRT_DQAJ  >> um/?pli=1#!searchin/confluent-platform/migrated$20to$
> >> 20another$20instance|sort:relevance/confluent-platform/
> >> LglWC_dZDKw/qsPuCRT_DQAJ>.
> >>
> >> Thanks
> >> Eno
> >> > On 24 Mar 2017, at 20:51, Jon Yeargers 
> >> wrote:
> >> >
> >> > I've setup a KTable as follows:
> >> >
> >> > KTable, String> outTable = sourceStream.groupByKey().
> >> > reduce(rowReducer,
> >> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
> >> > 1000).until(10 * 60 * 1000L),
> >> >"AggStore");
> >> >
> >> > I can confirm its presence via 'streams.allMetadata()' (accessible
> >> through
> >> > a simple httpserver).
> >> >
> >> > When I call 'ReadOnlyKeyValueStore store =
> >> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
> >> >
> >> > I get this exception:
> >> >
> >> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> >> > store, AggStore, may have migrated to another instance.
> >> >at
> >> > org.apache.kafka.streams.state.internals.QueryableStoreProvi
> >> der.getStore(QueryableStoreProvider.java:49)
> >> >at
> >> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> >> >at
> >> > com.cedexis.videokafka.videohouraggregator.RequestHandler.
> >> handle(RequestHandler.java:97)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:79)
> >> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:82)
> >> >at
> >> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se
> >> rverImpl.java:675)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:79)
> >> >at sun.net.httpserver.ServerImpl$
> Exchange.run(ServerImpl.java:6
> >> 47)
> >> >at
> >> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server
> >> Impl.java:158)
> >> >at
> >> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
> >> >at sun.net.httpserver.ServerImpl$
> Dispatcher.run(ServerImpl.java
> >> :396)
> >> >at java.lang.Thread.run(Thread.java:745)
> >> >
> >> >
> >> > ... except.. there is only one instance.. running locally.
> >>
> >>
> >
>


Re: Streams RocksDBException with no message?

2017-03-27 Thread Michael Noll
We're talking about `ulimit` (CLI tool) and the `nofile` limit (number of
open files), which you can access via `ulimit -n`.

Examples:
https://access.redhat.com/solutions/61334
https://stackoverflow.com/questions/21515463/how-to-increase-maximum-file-open-limit-ulimit-in-ubuntu

Depending on the operating system, the default setting is often pretty low
(e.g. 1024).  Bump this up to sth higher, like 16k or 32k.  Of course, an
even better approach is to monitor this metric on your servers/brokers, and
-- with this collected information -- bump the setting to a reasonable
value for your environment.

-Michael



On Sun, Mar 26, 2017 at 7:41 PM, Sachin Mittal  wrote:

> Hi,
> Could you please tell us what did you change for ulimit and how.
>
> We also are seem to be facing same issue.
>
> Thanks
> Sachin
>
>
> On Tue, Mar 21, 2017 at 9:22 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Thanks Guozhang.
> >
> > For my part, turns out I was hitting ulimit on my open file descriptors.
> > Phew, easy to fix... once I figured it out. :-)
> >
> > Mathieu
> >
> >
> > On Fri, Mar 17, 2017 at 4:14 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Mathieu,
> > >
> > > We are aware of that since long time ago and I have been looking into
> > this
> > > issue, turns out to be a known issue in RocksDB:
> > >
> > > https://github.com/facebook/rocksdb/issues/1688
> > >
> > > And the corresponding fix (https://github.com/facebook/
> rocksdb/pull/1714
> > )
> > > has been merged in master but marked for
> > >
> > >- v5.1.4 
> > >
> > > only while the latest release is 5.1.2.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Mar 17, 2017 at 10:27 AM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > > > Hey all,
> > > >
> > > > So... what does it mean to have a RocksDBException with a message
> that
> > > just
> > > > has a single character?  "e", "q", "]"... I've seen a few.  Has
> anyone
> > > seen
> > > > this before?
> > > >
> > > > Two example exceptions:
> > > > https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12
> > > >
> > > > Kafka Streams 0.10.2.0.  Both of these errors occurred during state
> > store
> > > > initialization.  I'm running a single Kafka Streams thread per
> server,
> > > this
> > > > occurred on two servers about a half-hour apart.
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Michael Noll
> If I understand this correctly: assuming I have a simple aggregator
> distributed across n-docker instances each instance will _also_ need to
> support some sort of communications process for allowing access to its
> statestore (last param from KStream.groupby.aggregate).

Yes.

See
http://docs.confluent.io/current/streams/developer-guide.html#your-application-and-interactive-queries
.

> - The tombstoning facilities of redis or C* would lend themselves well to
> implementing a 'true' rolling aggregation

What is a 'true' rolling aggregation, and how could Redis or C* help with
that in a way that Kafka can't?  (Honest question.)


> I get that RocksDB has a small footprint but given the choice of
> implementing my own RPC / gossip-like process for data sharing and using a
> well tested one (ala C* or redis) I would almost always opt for the
latter.
> [...]
> Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> kstreams architecture seems rife with potential.

One question is, for example:  can the remote/central DB of your choice
(Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
Over the network?  At the same low latency?  Also, what happens if the
remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
fact that your app's processing latency will now go through the roof?  I
wrote about some such scenarios at
https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/
.

One big advantage (for many use cases, not for) with Kafka/Kafka Streams is
that you can leverage fault-tolerant *local* state that may also be
distributed across app instances.  Local state is much more efficient and
faster when doing stateful processing such as joins or aggregations.  You
don't need to worry about an external system, whether it's up and running,
whether its version is still compatible with your app, whether it can scale
as much as your app/Kafka Streams/Kafka/the volume of your input data.

Also, note that some users have actually opted to run hybrid setups:  Some
processing output is sent to a remote data store like Cassandra (e.g. via
Kafka Connect), some processing output is exposed directly through
interactive queries.  It's not like your forced to pick only one approach.


> - Typical microservices would separate storing / retrieving data

I'd rather argue that for microservices you'd oftentimes prefer to *not*
use a remote DB, and rather do everything inside your microservice whatever
the microservice needs to do (perhaps we could relax this to "do everything
in a way that your microservices is in full, exclusive control", i.e. it
doesn't necessarily need to be *inside*, but arguably it would be better if
it actually is).
See e.g. the article
https://www.confluent.io/blog/data-dichotomy-rethinking-the-way-we-treat-data-and-services/
that lists some of the reasoning behind this school of thinking.  Again,
YMMV.

Personally, I think there's no simple true/false here.  The decisions
depend on what you need, what your context is, etc.  Anyways, since you
already have some opinions for the one side, I wanted to share some food
for thought for the other side of the argument. :-)

Best,
Michael





On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers 
wrote:

> If I understand this correctly: assuming I have a simple aggregator
> distributed across n-docker instances each instance will _also_ need to
> support some sort of communications process for allowing access to its
> statestore (last param from KStream.groupby.aggregate).
>
> How would one go about substituting a separated db (EG redis) for the
> statestore?
>
> Some advantages to decoupling:
> - It would seem like having a centralized process like this would alleviate
> the need to execute multiple requests for a given kv pair (IE "who has this
> data?" and subsequent requests to retrieve it).
> - it would take some pressure off of each node to maintain a large disk
> store
> - Typical microservices would separate storing / retrieving data
> - It would raise some eyebrows if a spec called for a mysql/nosql instance
> to be installed with every docker container
> - The tombstoning facilities of redis or C* would lend themselves well to
> implementing a 'true' rolling aggregation
>
> I get that RocksDB has a small footprint but given the choice of
> implementing my own RPC / gossip-like process for data sharing and using a
> well tested one (ala C* or redis) I would almost always opt for the latter.
> (Footnote: Our implementations already heavily use redis/memcached for
> deduplication of kafka messages so it would seem a small step to use the
> same to store aggregation results.)
>
> Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> kstreams architecture seems rife with potential.
>
> On Thu, Mar 23, 2017 at 3:17 PM, Matthias J. Sax 
> wrote:
>
> > The config does not "do" anything. It's metadata that get's broadcasted
> 

Re: clearing an aggregation?

2017-03-23 Thread Michael Noll
> Since apparently there isn't a way to iterate through Windowed KTables Im
> guessing that this sort of 'aggregate and clear' approach still requires
an
> external datastore (like Redis). Please correct me if Im wrong.

You don't need an external datastore.  You can use state stores for that:
http://docs.confluent.io/current/streams/developer-guide.html#state-stores

FWIW, a newer question of yours was asking how to access data in state
stores "from the outside", which (for the record) I answered by pointing to
Kafka's interactive queries feature:
http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

-Michael





On Wed, Mar 22, 2017 at 10:00 PM, Jon Yeargers 
wrote:

> I get that the windows are aligned along seconds but this doesn't really
> help with true clock alignment (IE top of the hour, midnight, etc).
>
> I can imagine a strategy using overlapping windows. One would
> (hypothetically) walk through the list until a window that spanned the
> desired time was found.
>
> Since apparently there isn't a way to iterate through Windowed KTables Im
> guessing that this sort of 'aggregate and clear' approach still requires an
> external datastore (like Redis). Please correct me if Im wrong.
>
> On Mon, Mar 20, 2017 at 9:29 AM, Michael Noll 
> wrote:
>
> > Jon,
> >
> > the windowing operation of Kafka's Streams API (in its DSL) aligns
> > time-based windows to the epoch [1]:
> >
> > Quoting from e.g. hopping windows (sometimes called sliding windows in
> > other technologies):
> >
> > > Hopping time windows are aligned to the epoch, with the lower interval
> > bound
> > > being inclusive and the upper bound being exclusive. “Aligned to the
> > epoch”
> > > means that the first window starts at timestamp zero.
> > > For example, hopping windows with a size of 5000ms and an advance
> > interval
> > > (“hop”) of 3000ms have predictable window boundaries
> > `[0;5000),[3000;8000),...`
> > > — and not `[1000;6000),[4000;9000),...` or even something “random” like
> > > `[1452;6452),[4452;9452),...`.
> >
> > Would that help you?
> >
> > -Michael
> >
> >
> >
> > [1] http://docs.confluent.io/current/streams/developer-guide.html
> >
> >
> > On Mon, Mar 20, 2017 at 12:51 PM, Jon Yeargers  >
> > wrote:
> >
> > > Is this possible? Im wondering about gathering data from a stream into
> a
> > > series of windowed aggregators: minute, hour and day. A separate
> process
> > > would start at fixed intervals, query the appropriate state store for
> > > available values and then hopefully clear / zero / reset everything for
> > the
> > > next interval.
> > >
> > > I could use the retention period setting but I would (somehow) need to
> > > guarantee that the windows would reset on clock boundaries and not
> based
> > on
> > > start time for the app.
> > >
> >
>


Re: Getting current value of aggregated key

2017-03-23 Thread Michael Noll
Jon,

you can use Kafka's interactive queries feature for this:
http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

-Michael



On Thu, Mar 23, 2017 at 1:52 PM, Jon Yeargers 
wrote:

> If I have an aggregation :
>
> KTable, VideoLogLine> outTable =
> sourceStream.groupByKey().reduce(rowReducer,
> TimeWindows.of(60 * 60 * 1000L).until(10 * 60 * 1000L),
> "HourAggStore");
>
> how would I go about getting some value from this with a separate process?
> I have the "HourAggStore" but Im not clear how to retrieve anything from
> it.
>


Re: Error in running PageViewTypedDemo

2017-03-23 Thread Michael Noll
The quickest answer I can give you is trying a similar example [1], where
we provide a driver that generates the required input data for the page
view example.

-Michael



[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
(this is for Confluent 3.2 and Apache Kafka 0.10.2.0)



On Thu, Mar 23, 2017 at 11:56 AM, Shanthi Nellaiappan 
wrote:

> Any example for the above would be appreciated. Thanks
>
> On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan 
> wrote:
>
> > Thanks for the info.
> > With "page2",{"user":"2", "page":"22", "timestamp":143527817} as
> input
> > for streams-pageview-input   an "2",{"region":"CA","timestamp"
> :143527817}
> > as input for  streams-userprofile-input, the following error is shown,
> > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> > JsonTimestampExtractor cannot recognize the record value
> > org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$
> > PageViewByRegion@4764b2e
> > at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.
> > extract(JsonTimestampExtractor.java:43)
> > at org.apache.kafka.streams.processor.internals.
> RecordQueue.addRawRecords(
> > RecordQueue.java:105)
> > at org.apache.kafka.streams.processor.internals.
> > PartitionGroup.addRawRecords(PartitionGroup.java:117)
> > at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> > StreamTask.java:144)
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:415)
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Any example on the correct input value is really appreciated.
> >
> > Thanks
> >
> > On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll 
> > wrote:
> >
> >> IIRC the PageViewTypedDemo example requires input data where the
> >> username/userId is captured in the keys of messages/records, and further
> >> information in the values of those messages.
> >>
> >> The problem you are running into is that, when you are writing your
> input
> >> data via the console consumer, the records you are generating only have
> >> values -- the keys are null because you don't specify any explicitly.
> >>
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >>
> >> And you have the same issue for the other topic,
> "streams-pageview-input".
> >>
> >> To enter keys, you need to add some CLI options to the console producer.
> >>
> >> Example:
> >>
> >> $ bin/kafka-console-producer --broker-list localhost:9092 \
> >>  --topic streams-userprofile-input \
> >>  --property parse.key=true \
> >>  --property key.separator=,
> >>
> >> firstUser,firstValue
> >> secondUser,secondValue
> >>
> >> Hope this helps,
> >> Michael
> >>
> >>
> >>
> >>
> >> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <
> shan2n...@gmail.com
> >> >
> >> wrote:
> >>
> >> > I have started exploring kafka streaming API. I'm trying to  run
> >> > PageViewTypedDemo program as it is without any changes locally on a
> >> > desktop. Current kafka version is 0.10.1.0.
> >> >
> >> > With the following inputs from 2 different console,
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-pageview-input
> >> >
> >> > {"user":"1", "page":"22", "timestamp":143527817}
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >> >
> >> > The error is
> >> >
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >> > proces

Re: kafka streams in-memory Keyvalue store iterator remove broken on upgrade to 0.10.2.0 from 0.10.1.1

2017-03-22 Thread Michael Noll
To add to what Matthias said, in case the following isn't clear:

- You should not (and, in 0.10.2, cannot any longer) call the iterator's
remove() method, i.e. `KeyValueIterator#remove()` when iterating through a
`KeyValueStore`.  Perhaps this is something we should add to the
`KeyValueIterator` javadocs.

- You can of course call the store's delete() method:
`KeyValueStore#delete(K key)`.

Just mentioning this because, when reading the thread quickly, I missed the
"iterator" part and thought removal/deletion on the store wasn't working.
;-)

Best,
Michael




On Wed, Mar 22, 2017 at 8:18 PM, Matthias J. Sax 
wrote:

> Hi,
>
> remove() should not be supported -- thus, it's actually a bug in 0.10.1
> that got fixed in 0.10.2.
>
> Stores should only be altered by Streams and iterator over the stores
> should be read-only -- otherwise, you might mess up Streams internal state.
>
> I would highly recommend to reconsider the call to it.remove() in you
> application. Not sure what you try to accomplish, but you should do it
> differently.
>
>
> -Matthias
>
>
> On 3/22/17 8:00 AM, Tom Dearman wrote:
> > Hi, hope someone on kafka-streams team can help.  Our application uses
> >
> > KeyValueIterator it = KeyValueStore.all();
> >
> > …..
> > it.remove()
> >
> >
> > This used to work but is now broken, causes our punctuate to fail and
> StreamThread to die.  The cause seems to be that there were changes in
> 0.10.2.0 to InMemoryKeyValueStoreSupplier:
> >
> >
> >
> > public synchronized KeyValueIterator all() {
> > final TreeMap copy = new TreeMap<>(this.map);
> > return new MemoryStoreIterator<>(copy.entrySet().iterator());
> > }
> >
> > @Override
> > public synchronized KeyValueIterator all() {
> > final TreeMap copy = new TreeMap<>(this.map);
> > return new DelegatingPeekingKeyValueIterator<>(name, new
> MemoryStoreIterator<>(copy.entrySet().iterator()));
> > }
> > But the DelegatingPeekingKeyValueIterator has:
> >
> > @Override
> > public void remove() {
> > throw new UnsupportedOperationException("remove not supported");
> > }
> > whereas the old direct call on MemoryStoreIterator allowed remove.  For
> some reason there is no call to underlying.remove() in the
> DelegatingPeekingKeyValueIterator.
> >
> > We don’t want to downgrade to 0.10.1.1 as there was a useful bug fix and
> removing dependancy on zookeeper.
> >
> > Thanks,
> > Tom
> >
>
>


Re: Error in running PageViewTypedDemo

2017-03-22 Thread Michael Noll
IIRC the PageViewTypedDemo example requires input data where the
username/userId is captured in the keys of messages/records, and further
information in the values of those messages.

The problem you are running into is that, when you are writing your input
data via the console consumer, the records you are generating only have
values -- the keys are null because you don't specify any explicitly.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}

And you have the same issue for the other topic, "streams-pageview-input".

To enter keys, you need to add some CLI options to the console producer.

Example:

$ bin/kafka-console-producer --broker-list localhost:9092 \
 --topic streams-userprofile-input \
 --property parse.key=true \
 --property key.separator=,

firstUser,firstValue
secondUser,secondValue

Hope this helps,
Michael




On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan 
wrote:

> I have started exploring kafka streaming API. I'm trying to  run
> PageViewTypedDemo program as it is without any changes locally on a
> desktop. Current kafka version is 0.10.1.0.
>
> With the following inputs from 2 different console,
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-pageview-input
>
> {"user":"1", "page":"22", "timestamp":143527817}
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}
>
> The error is
>
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-01,
> topic=streams-userprofile-input, partition=0, offset=0
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:200)
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
>
> Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> for
> the source KTable from store name streams-userprofile-store-name should not
> be null.
>
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$
> MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:204)
>
> at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
>
> ... 2 more
>
>
> Can someone help .Is there anything else to be done apart from creating the
> 2 topics streams-pageview-input & streams-userprofile-input
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-22 Thread Michael Noll
Forwarding to kafka-user.


-- Forwarded message --
From: Michael Noll 
Date: Wed, Mar 22, 2017 at 8:48 AM
Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
To: d...@kafka.apache.org


Matthias,

> @Michael:
>
> You seemed to agree with Jay about not exposing the `Topology` concept
> in our main entry class (ie, current KStreamBuilder), thus, I
> interpreted that you do not want `Topology` in the name either (I am a
> little surprised by your last response, that goes the opposite direction).

Oh, sorry for not being clear.

What I wanted to say in my earlier email was the following:  Yes, I do
agree with most of Jay's reasoning, notably about carefully deciding how
much and which parts of the API/concept "surface" we expose to users of the
DSL.  However, and this is perhaps where I wasn't very clear, I disagree on
the particular opinion about not exposing the topology concept to DSL
users.  Instead, I think the concept of a topology is important to
understand even for DSL users -- particularly because of the way the DSL is
currently wiring your processing logic via the builder pattern.  (As I
noted, e.g. Akka uses a different approach where you might be able to get
away with not exposing the "topology" concept, but even in Akka there's the
notion of graphs and flows.)


> > StreamsBuilder builder = new StreamsBuilder();
> >
> > // And here you'd define your...well, what actually?
> > // Ah right, you are composing a topology here, though you are not
> > aware of it.
>
> Yes. You are not aware of if -- that's the whole point about it -- don't
> put the Topology concept in the focus...

Let me turn this around, because that was my point: it's confusing to have
a name "StreamsBuilder" if that thing isn't building streams, and it is not.

As I mentioned before, I do think it is a benefit to make it clear to DSL
users that there are two aspects at play: (1) defining the logic/plan of
your processing, and (2) the execution of that plan.  I have a less strong
opinion whether or not having "topology" in the names would help to
communicate this separation as well as combination of (1) and (2) to make
your app work as expected.

If we stick with `KafkaStreams` for (2) *and* don't like having "topology"
in the name, then perhaps we should rename `KStreamBuilder` to
`KafkaStreamsBuilder`.  That at least gives some illusion of a combo of (1)
and (2).  IMHO, `KafkaStreamsBuilder` highlights better that "it is a
builder/helper for the Kafka Streams API", rather than "a builder for
streams".

Also, I think some of the naming challenges we're discussing here are
caused by having this builder pattern in the first place.  If the Streams
API was implemented in Scala, for example, we could use implicits for
helping us to "stitch streams/tables together to build the full topology",
thus using a different (better?) approach to composing your topologies that
through a builder pattern.  So: perhaps there's a better way then the
builder, and that way would also be clearer on terminology?  That said,
this might take this KIP off-scope.

-Michael




On Wed, Mar 22, 2017 at 12:33 AM, Matthias J. Sax 
wrote:

> @Guozhang:
>
> I recognized that you want to have `Topology` in the name. But it seems
> that more people preferred to not have it (Jay, Ram, Michael [?], myself).
>
> @Michael:
>
> You seemed to agree with Jay about not exposing the `Topology` concept
> in our main entry class (ie, current KStreamBuilder), thus, I
> interpreted that you do not want `Topology` in the name either (I am a
> little surprised by your last response, that goes the opposite direction).
>
> > StreamsBuilder builder = new StreamsBuilder();
> >
> > // And here you'd define your...well, what actually?
> > // Ah right, you are composing a topology here, though you are not
> > aware of it.
>
> Yes. You are not aware of if -- that's the whole point about it -- don't
> put the Topology concept in the focus...
>
> Furthermore,
>
> >>> So what are you building here with StreamsBuilder?  Streams (hint: No)?
> >>> And what about tables -- is there a TableBuilder (hint: No)?
>
> I am not sure, if this is too much a concern. In contrast to
> `KStreamBuilder` (singular) that contains `KStream` and thus puts
> KStream concept in focus and thus degrade `KTable`, `StreamsBuilder`
> (plural) focuses on "Streams API". IMHO, it does not put focus on
> KStream. It's just a builder from the Streams API -- you don't need to
> worry what you are building -- and you don't need to think about the
> `Topology` concept (of course, you see that .build() return a Top

Re: Using Kafka Stream in the cluster of kafka on one or multiple docker-machine/s

2017-03-21 Thread Michael Noll
Typically you'd containerize your app and then launch e.g. 10 containers if
you need to run 10 instances of your app.

Also, what do you mean by "in a cluster of Kafka containers" and "in the
cluster of Kafkas"?

On Tue, Mar 21, 2017 at 9:08 PM, Mina Aslani  wrote:

> Hi,
>
> I am trying to understand how I can use a kafka stream app(jar file) in a
> cluster of kafka containers.
>
> Kafka does not have master/slave concept (unlike spark), how I should run
> my app in the cluster of kafkas (e.g. on one or multiple docker-machine/s)?
>
> I use below command line when having one VM/node with one kafka container
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/
> java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62
>
> Best regards,
> Mina
>


Re: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-20 Thread Michael Noll
t; Right now, a new client in actively instantiated (ie, by calling "new")
> >> and the topology if provided as a constructor argument. However,
> >> especially for DSL (not sure if it would make sense for PAPI), the DSL
> >> builder could create the client for the user.
> >>
> >> Something like this:
> >>
> >>> KStreamBuilder builder = new KStreamBuilder();
> >>> builder.whatever() // use the builder
> >>>
> >>> StreamsConfig config = 
> >>> KafkaStreams streams = builder.getKafkaStreams(config);
> >>
> >> If we change the patter like this, the notion a the "DSL builder" would
> >> change, as it does not create a topology anymore, but it creates the
> >> "processing client". This would address Jay's concern about "not
> >> exposing concept users don't need the understand" and would not require
> >> to include the word "Topology" in the DSL builder class name, because
> >> the builder does not build a Topology anymore.
> >>
> >> I just put some names that came to my mind first hand -- did not think
> >> about good names. It's just to discuss the pattern.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >>
> >> On 3/14/17 3:36 AM, Michael Noll wrote:
> >>> I see Jay's point, and I agree with much of it -- notably about being
> >>> careful which concepts we do and do not expose, depending on which user
> >>> group / user type is affected.  That said, I'm not sure yet whether or
> >> not
> >>> we should get rid of "Topology" (or a similar term) in the DSL.
> >>>
> >>> For what it's worth, here's how related technologies define/name their
> >>> "topologies" and "builders".  Note that, in all cases, it's about
> >>> constructing a logical processing plan, which then is being
> executed/run.
> >>>
> >>> - `Pipeline` (Google Dataflow/Apache Beam)
> >>> - To add a source you first instantiate the Source (e.g.
> >>> `TextIO.Read.from("gs://some/inputData.txt")`),
> >>>   then attach it to your processing plan via
> >> `Pipeline#apply()`.
> >>>   This setup is a bit different to our DSL because in our DSL the
> >>> builder does both, i.e.
> >>>   instantiating + auto-attaching to itself.
> >>> - To execute the processing plan you call `Pipeline#execute()`.
> >>> - `StreamingContext`` (Spark): This setup is similar to our DSL.
> >>> - To add a source you call e.g.
> >>> `StreamingContext#socketTextStream("localhost", )`.
> >>> - To execute the processing plan you call
> >> `StreamingContext#execute()`.
> >>> - `StreamExecutionEnvironment` (Flink): This setup is similar to our
> DSL.
> >>> - To add a source you call e.g.
> >>> `StreamExecutionEnvironment#socketTextStream("localhost", )`.
> >>> - To execute the processing plan you call
> >>> `StreamExecutionEnvironment#execute()`.
> >>> - `Graph`/`Flow` (Akka Streams), as a result of composing Sources (~
> >>> `KStreamBuilder.stream()`) and Sinks (~ `KStream#to()`)
> >>>   into Flows, which are [Runnable]Graphs.
> >>> - You instantiate a Source directly, and then compose the Source
> with
> >>> Sinks to create a RunnableGraph:
> >>>   see signature `Source#to[Mat2](sink: Graph[SinkShape[Out],
> Mat2]):
> >>> RunnableGraph[Mat]`.
> >>> - To execute the processing plan you call `Flow#run()`.
> >>>
> >>> In our DSL, in comparison, we do:
> >>>
> >>> - `KStreamBuilder` (Kafka Streams API)
> >>> - To add a source you call e.g. `KStreamBuilder#stream("input-
> >> topic")`.
> >>> - To execute the processing plan you create a `KafkaStreams`
> instance
> >>> from `KStreamBuilder`
> >>>   (where the builder will instantiate the topology = processing
> plan
> >> to
> >>> be executed), and then
> >>>   call `KafkaStreams#start()`.  Think of `KafkaStreams` as our
> >> runner.
> >>>
> >>> First, I agree with the sentiment that the current name of
> >> `KStreamBuilder`
> >>> isn't great (which is why we're having this dis

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
Ali,

what you describe is (roughly!) how Kafka Streams implements the internal
state stores to support windowing.

Some users have been following a similar approach as you outlined, using
the Processor API.



On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar  wrote:

> It would be helpful to know the 'start' and 'end' of the current metadata,
> so if an out of order message arrives late, and is being processed in
> foreach(), you'd know which window / bucket it belongs to, and can handle
> it accordingly.
>
> I'm guessing that's not possible at the moment.
>
> (My use case is, i receive a stream of messages. Messages need to be stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
> of 30 mins or more since the last message (under a key), a new 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
>
> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
> wrote:
>
> > > Can windows only be used for aggregations, or can they also be used for
> > foreach(),
> > and such?
> >
> > As of today, you can use windows only in aggregations.
> >
> > > And is it possible to get metadata on the message, such as whether or
> > not its
> > late, its index/position within the other messages, etc?
> >
> > If you use the Processor API of Kafka Streams, you can have access to an
> > incoming record's topic, partition, offset, etc. via the so-called
> > ProcessorContext (which is updated for every new incoming record):
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/Processor.html
> > - You can get/store a reference to the ProcessorContext from
> > `Processor#init()`.
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/ProcessorContext.html
> > - The context can then be used within `Processor#process()` when you
> > process a new record.  As I said, the context is updated behind the
> scenes
> > to match the record that is currently being processed.
> >
> >
> > Best,
> > Michael
> >
> >
> >
> >
> > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> wrote:
> >
> > > Can windows only be used for aggregations, or can they also be used for
> > > foreach(), and such?
> > >
> > > And is it possible to get metadata on the message, such as whether or
> not
> > > its late, its index/position within the other messages, etc?
> > >
> > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> > > wrote:
> > >
> > > > And since you asked for a pointer, Ali:
> > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > >
> > > >
> > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> > > > wrote:
> > > >
> > > > > Late-arriving and out-of-order data is only treated specially for
> > > > windowed
> > > > > aggregations.
> > > > >
> > > > > For stateless operations such as `KStream#foreach()` or
> > > `KStream#map()`,
> > > > > records are processed in the order they arrive (per partition).
> > > > >
> > > > > -Michael
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  >
> > > > wrote:
> > > > >
> > > > >> > later when message A arrives it will put that message back into
> > > > >> > the right temporal context and publish an amended result for the
> > > > proper
> > > > >> > time/session window as if message B were consumed in the
> timestamp
> > > > order
> > > > >> > before message A.
> > > > >>
> > > > >> Does this apply to the aggregation Kafka stream methods then, and
> > not
> > > to
> > > > >> e.g foreach?
> > > > >>
> > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> h...@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > >> >
> > > > >> > Kafka Streams helps you write stateful apps and allows that
> state
> > to
> > > > be
>

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
> Can windows only be used for aggregations, or can they also be used for 
> foreach(),
and such?

As of today, you can use windows only in aggregations.

> And is it possible to get metadata on the message, such as whether or not its
late, its index/position within the other messages, etc?

If you use the Processor API of Kafka Streams, you can have access to an
incoming record's topic, partition, offset, etc. via the so-called
ProcessorContext (which is updated for every new incoming record):

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/Processor.html
- You can get/store a reference to the ProcessorContext from
`Processor#init()`.

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/ProcessorContext.html
- The context can then be used within `Processor#process()` when you
process a new record.  As I said, the context is updated behind the scenes
to match the record that is currently being processed.


Best,
Michael




On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar  wrote:

> Can windows only be used for aggregations, or can they also be used for
> foreach(), and such?
>
> And is it possible to get metadata on the message, such as whether or not
> its late, its index/position within the other messages, etc?
>
> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> wrote:
>
> > And since you asked for a pointer, Ali:
> > http://docs.confluent.io/current/streams/concepts.html#windowing
> >
> >
> > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> > wrote:
> >
> > > Late-arriving and out-of-order data is only treated specially for
> > windowed
> > > aggregations.
> > >
> > > For stateless operations such as `KStream#foreach()` or
> `KStream#map()`,
> > > records are processed in the order they arrive (per partition).
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar 
> > wrote:
> > >
> > >> > later when message A arrives it will put that message back into
> > >> > the right temporal context and publish an amended result for the
> > proper
> > >> > time/session window as if message B were consumed in the timestamp
> > order
> > >> > before message A.
> > >>
> > >> Does this apply to the aggregation Kafka stream methods then, and not
> to
> > >> e.g foreach?
> > >>
> > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
> > >> wrote:
> > >>
> > >> > Yes stream processing and CEP are subtlety different things.
> > >> >
> > >> > Kafka Streams helps you write stateful apps and allows that state to
> > be
> > >> > preserved on disk (a local State store) as well as distributed for
> HA
> > or
> > >> > for parallel partitioned processing (via Kafka topic partitions and
> > >> > consumer groups) as well as in memory (as a performance
> enhancement).
> > >> >
> > >> > However a classical CEP engine with a pre-modeled state machine and
> > >> > pattern matching rules is something different from stream
> processing.
> > >> >
> > >> > It is on course possible to build a CEP system on top on Kafka
> Streams
> > >> and
> > >> > get the best of both worlds.
> > >> >
> > >> > -hans
> > >> >
> > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > >> > sabarish@gmail.com> wrote:
> > >> > >
> > >> > > Hans
> > >> > >
> > >> > > What you state would work for aggregations, but not for state
> > machines
> > >> > and
> > >> > > CEP.
> > >> > >
> > >> > > Regards
> > >> > > Sab
> > >> > >
> > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
> > >> wrote:
> > >> > >>
> > >> > >> The only way to make sure A is consumed first would be to delay
> the
> > >> > >> consumption of message B for at least 15 minutes which would fly
> in
> > >> the
> > >> > >> face of the principals of a true streaming platform so the short
> > >> answer
> > >> > to
> > >> > >> your question is "no" because that would be batch processing not
> > >> str

Re: clearing an aggregation?

2017-03-20 Thread Michael Noll
Jon,

the windowing operation of Kafka's Streams API (in its DSL) aligns
time-based windows to the epoch [1]:

Quoting from e.g. hopping windows (sometimes called sliding windows in
other technologies):

> Hopping time windows are aligned to the epoch, with the lower interval
bound
> being inclusive and the upper bound being exclusive. “Aligned to the
epoch”
> means that the first window starts at timestamp zero.
> For example, hopping windows with a size of 5000ms and an advance interval
> (“hop”) of 3000ms have predictable window boundaries
`[0;5000),[3000;8000),...`
> — and not `[1000;6000),[4000;9000),...` or even something “random” like
> `[1452;6452),[4452;9452),...`.

Would that help you?

-Michael



[1] http://docs.confluent.io/current/streams/developer-guide.html


On Mon, Mar 20, 2017 at 12:51 PM, Jon Yeargers 
wrote:

> Is this possible? Im wondering about gathering data from a stream into a
> series of windowed aggregators: minute, hour and day. A separate process
> would start at fixed intervals, query the appropriate state store for
> available values and then hopefully clear / zero / reset everything for the
> next interval.
>
> I could use the retention period setting but I would (somehow) need to
> guarantee that the windows would reset on clock boundaries and not based on
> start time for the app.
>


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
Late-arriving and out-of-order data is only treated specially for windowed
aggregations.

For stateless operations such as `KStream#foreach()` or `KStream#map()`,
records are processed in the order they arrive (per partition).

-Michael




On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  wrote:

> > later when message A arrives it will put that message back into
> > the right temporal context and publish an amended result for the proper
> > time/session window as if message B were consumed in the timestamp order
> > before message A.
>
> Does this apply to the aggregation Kafka stream methods then, and not to
> e.g foreach?
>
> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen  wrote:
>
> > Yes stream processing and CEP are subtlety different things.
> >
> > Kafka Streams helps you write stateful apps and allows that state to be
> > preserved on disk (a local State store) as well as distributed for HA or
> > for parallel partitioned processing (via Kafka topic partitions and
> > consumer groups) as well as in memory (as a performance enhancement).
> >
> > However a classical CEP engine with a pre-modeled state machine and
> > pattern matching rules is something different from stream processing.
> >
> > It is on course possible to build a CEP system on top on Kafka Streams
> and
> > get the best of both worlds.
> >
> > -hans
> >
> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > sabarish@gmail.com> wrote:
> > >
> > > Hans
> > >
> > > What you state would work for aggregations, but not for state machines
> > and
> > > CEP.
> > >
> > > Regards
> > > Sab
> > >
> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
> wrote:
> > >>
> > >> The only way to make sure A is consumed first would be to delay the
> > >> consumption of message B for at least 15 minutes which would fly in
> the
> > >> face of the principals of a true streaming platform so the short
> answer
> > to
> > >> your question is "no" because that would be batch processing not
> stream
> > >> processing.
> > >>
> > >> However, Kafka Streams does handle late arriving data. So if you had
> > some
> > >> analytics that computes results on a time window or a session window
> > then
> > >> Kafka streams will compute on the stream in real time (processing
> > message
> > >> B) and then later when message A arrives it will put that message back
> > into
> > >> the right temporal context and publish an amended result for the
> proper
> > >> time/session window as if message B were consumed in the timestamp
> order
> > >> before message A. The end result of this flow is that you eventually
> get
> > >> the same results you would get in a batch processing system but with
> the
> > >> added benefit of getting intermediary result at much lower latency.
> > >>
> > >> -hans
> > >>
> > >> /**
> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >> * h...@confluent.io (650)924-2670
> > >> */
> > >>
> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar 
> > wrote:
> > >>>
> > >>> Is it possible to have Kafka Streams order messages correctly by
> their
> > >>> timestamps, even if they arrived out of order?
> > >>>
> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> > >>> timestamp of 5:15 PM, are sent.
> > >>>
> > >>> Message B arrives sooner than Message A, due to network issues.
> > >>>
> > >>> Is it possible to make sure that, across all consumers of Kafka
> Streams
> > >>> (even if they are across different servers, but have the same
> consumer
> > >>> group), Message A is consumed first, before Message B?
> > >>>
> > >>> Thanks.
> > >>>
> > >>
> >
>


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
And since you asked for a pointer, Ali:
http://docs.confluent.io/current/streams/concepts.html#windowing


On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll  wrote:

> Late-arriving and out-of-order data is only treated specially for windowed
> aggregations.
>
> For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> records are processed in the order they arrive (per partition).
>
> -Michael
>
>
>
>
> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  wrote:
>
>> > later when message A arrives it will put that message back into
>> > the right temporal context and publish an amended result for the proper
>> > time/session window as if message B were consumed in the timestamp order
>> > before message A.
>>
>> Does this apply to the aggregation Kafka stream methods then, and not to
>> e.g foreach?
>>
>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
>> wrote:
>>
>> > Yes stream processing and CEP are subtlety different things.
>> >
>> > Kafka Streams helps you write stateful apps and allows that state to be
>> > preserved on disk (a local State store) as well as distributed for HA or
>> > for parallel partitioned processing (via Kafka topic partitions and
>> > consumer groups) as well as in memory (as a performance enhancement).
>> >
>> > However a classical CEP engine with a pre-modeled state machine and
>> > pattern matching rules is something different from stream processing.
>> >
>> > It is on course possible to build a CEP system on top on Kafka Streams
>> and
>> > get the best of both worlds.
>> >
>> > -hans
>> >
>> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>> > sabarish@gmail.com> wrote:
>> > >
>> > > Hans
>> > >
>> > > What you state would work for aggregations, but not for state machines
>> > and
>> > > CEP.
>> > >
>> > > Regards
>> > > Sab
>> > >
>> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
>> wrote:
>> > >>
>> > >> The only way to make sure A is consumed first would be to delay the
>> > >> consumption of message B for at least 15 minutes which would fly in
>> the
>> > >> face of the principals of a true streaming platform so the short
>> answer
>> > to
>> > >> your question is "no" because that would be batch processing not
>> stream
>> > >> processing.
>> > >>
>> > >> However, Kafka Streams does handle late arriving data. So if you had
>> > some
>> > >> analytics that computes results on a time window or a session window
>> > then
>> > >> Kafka streams will compute on the stream in real time (processing
>> > message
>> > >> B) and then later when message A arrives it will put that message
>> back
>> > into
>> > >> the right temporal context and publish an amended result for the
>> proper
>> > >> time/session window as if message B were consumed in the timestamp
>> order
>> > >> before message A. The end result of this flow is that you eventually
>> get
>> > >> the same results you would get in a batch processing system but with
>> the
>> > >> added benefit of getting intermediary result at much lower latency.
>> > >>
>> > >> -hans
>> > >>
>> > >> /**
>> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >> * h...@confluent.io (650)924-2670
>> > >> */
>> > >>
>> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar 
>> > wrote:
>> > >>>
>> > >>> Is it possible to have Kafka Streams order messages correctly by
>> their
>> > >>> timestamps, even if they arrived out of order?
>> > >>>
>> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>> > >>> timestamp of 5:15 PM, are sent.
>> > >>>
>> > >>> Message B arrives sooner than Message A, due to network issues.
>> > >>>
>> > >>> Is it possible to make sure that, across all consumers of Kafka
>> Streams
>> > >>> (even if they are across different servers, but have the same
>> consumer
>> > >>> group), Message A is consumed first, before Message B?
>> > >>>
>> > >>> Thanks.
>> > >>>
>> > >>
>> >
>>
>
>
>


Re: Not Serializable Result Error

2017-03-15 Thread Michael Noll
Hi Armaan,

> org.apache.spark.SparkException: Job aborted due to stage failure:
>Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord

perhaps you should ask that question in the Spark mailing list, which
should increase your chances of getting a good response for this Spark
error.  You should also share the Spark and Kafka versions you use.

-Michael



On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani <
armaan.esfah...@advancedopen.com> wrote:

> Hello, I have been trying to setup a SMACK stack to learn the basics of
> Kafka Streams and Spark, yet I keep coming across the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.kafka.clients.consumer.ConsumerRecord
>
>
>
> I have a “Tweet” object which is a simple POJO with a Date and String that
> then has a Serializer and Deserializer class.
>
>
>
> I have tested creating an object, serializing it to a local file, then
> reading it with the deserializer and it works fine—however over the stream
> it fails.
>
>
>
> To read the data from the kafka stream , I have setup a an input stream
> using the following code:
>
>
>
> Map  kafkaParams = new HashMap<>();
>
> kafkaParams.put("bootstrap.servers", brokers);
>
> kafkaParams.put("key.deserializer", StringDeserializer.class);
>
> kafkaParams.put("value.deserializer", TweetDeserializer.class);
>
> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>
>
> JavaInputDStream> tweets =
> KafkaUtils.createDirectStream(
>
> jssc,
>
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topicsSet,
> kafkaParams)
>
> );
>
>
>
> To send a sample object to Kafka, I have the following for testing:
>
>
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "192.168.194.194:9092");
>
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> props.put("value.serializer", "com.armaanaki.smack.tweet.
> TweetSerializer");
>
>
>
> final KafkaProducer kafkaProducer = new
> KafkaProducer(props);
>
>
>
> ProducerRecord record = new ProducerRecord Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
>
> kafkaProducer.send(record);
>
>
>
>
>
> Can anyone explain my error? Thanks!
>
>


Re: Trying to use Kafka Stream

2017-03-15 Thread Michael Noll
Ah, I see.

> However, running the program (e.g. https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181) in
my IDE was not and still is not working.

Another thing to try is to run the program above from the CLI, not from
your IDE (perhaps your IDE setup is wonky).
That's described in step 3 of the program's usage instructions [1].

[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62



On Wed, Mar 15, 2017 at 12:56 PM, Mina Aslani  wrote:

> Hi Michael,
>
> I was aware that the output should be written in a kafka topic not the
> console.
>
> To understand if streams can reach the kafka as Eno asked in earlier email
> I found http://docs.confluent.io/3.2.0/streams/quickstart.html
> #goal-of-this-quickstart and went through the steps mentioned and ran
> /usr/bin/kafka-run-class
> org.apache.kafka.streams.examples.wordcount.WordCountDemo which works.
>
> However, running the program (e.g. https://github.com/
> confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
> java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181)
> in my IDE was not and still is not working.
>
> Best regards,
> Mina
>
>
> On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll 
> wrote:
>
> > Mina,
> >
> > in your original question you wrote:
> >
> > > However, I do not see the word count when I try to run below example.
> > Looks like that it does not connect to Kafka.
> >
> > The WordCount demo example writes its output to Kafka only --  it *does
> > not* write any results to the console/STDOUT.
> >
> > From what I can tell the WordCount example ran correctly because, in your
> > latest email, you showed the output of the console consumer (which *does*
> > write to the console), and that output is a list of words and counts:
> >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> >
> > In other words, I think everything you did was correct, and Kafka too was
> > working correctly.  You were simply unaware that the WordCount example
> does
> > not write its output to the console.
> >
> > Best,
> > Michael
> >
> >
> >
> >
> >
> > On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani 
> wrote:
> >
> > > Hi,
> > > I just checked streams-wordcount-output topic using below command
> > >
> > > docker run \
> > >
> > >   --net=host \
> > >
> > >   --rm \
> > >
> > >   confluentinc/cp-kafka:3.2.0 \
> > >
> > >   kafka-console-consumer --bootstrap-server localhost:9092 \
> > >
> > >   --topic streams-wordcount-output \
> > >
> > >   --from-beginning \
> > >
> > >   --formatter kafka.tools.DefaultMessageFormatter \
> > >
> > >   --property print.key=true \
> > >
> > >   --property key.deserializer=org.apache.ka
> > > fka.common.serialization.StringDeserializer \
> > >
> > >   --property value.deserializer=org.apache.
> > > kafka.common.serialization.LongDeserializer
> > >
> > >
> > > and it returns
> > >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> > >
> > > Please note above result is when I tried  http://docs.confluent.i
> > > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > > docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examp
> > > les.wordcount.WordCountDemo.
> > >
> > > How come running same program out of docker-machine does not output to
> > the
> > > output topic?
> > > Should I make the program as jar and deploy to docker-machine and run
> it
> > > using ./bin/kafka-run-class?
> > >
> > > Best regards,
> > > Mina
> > >
> > >
> > >
> > > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> > > wrote:
> > >
> > > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> > > goal-
> > > > of-this-quickstart
> > > >
> > > > and in docker-machine  ran /usr/bin/kafka-run-class
> > > > org.apache.kafka.s

Re: Trying to use Kafka Stream

2017-03-15 Thread Michael Noll
Mina,

in your original question you wrote:

> However, I do not see the word count when I try to run below example.
Looks like that it does not connect to Kafka.

The WordCount demo example writes its output to Kafka only --  it *does
not* write any results to the console/STDOUT.

>From what I can tell the WordCount example ran correctly because, in your
latest email, you showed the output of the console consumer (which *does*
write to the console), and that output is a list of words and counts:

> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1

In other words, I think everything you did was correct, and Kafka too was
working correctly.  You were simply unaware that the WordCount example does
not write its output to the console.

Best,
Michael





On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani  wrote:

> Hi,
> I just checked streams-wordcount-output topic using below command
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:9092 \
>
>   --topic streams-wordcount-output \
>
>   --from-beginning \
>
>   --formatter kafka.tools.DefaultMessageFormatter \
>
>   --property print.key=true \
>
>   --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
>
>   --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
>
>
> and it returns
>
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
>
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
>
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
>
> Best regards,
> Mina
>
>
>
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> wrote:
>
> > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> > of-this-quickstart
> >
> > and in docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >
> > Running
> >
> > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > streams-wordcount-output --new-consumer --from-beginning
> >
> > shows 8 blank messages
> >
> > Is there any setting/configuration should be done as running the class in
> > the docker-machine and running program outside the docker-machine does
> not
> > return expected result!
> >
> > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani 
> wrote:
> >
> >> And the port for kafka is 29092 and for zookeeper 32181.
> >>
> >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I forgot to add in my previous email 2 questions.
> >>>
> >>> To setup my env, shall I use https://raw.githubusercont
> >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>> single-node/docker-compose.yml instead or is there any other
> >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>
> >>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>
> >>> Regards,
> >>> Mina
> >>>
> >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani 
> >>> wrote:
> >>>
>  Hi Eno,
> 
>  Sorry! That is a typo!
> 
>  I have a docker-machine with different containers (setup as directed @
>  http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
> 
>  docker ps --format "{{.Image}}: {{.Names}}"
> 
>  confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> 
>  confluentinc/cp-enterprise-control-center:3.2.0: control-center
> 
>  confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> 
>  confluentinc/cp-schema-registry:3.2.0: schema-registry
> 
>  confluentinc/cp-kafka:3.2.0: kafka
> 
>  confluentinc/cp-zookeeper:3.2.0: zookeeper
> 
>  I used example @ https://github.com/confluent
>  inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>  uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>  followed the same steps.
> 
>  When I run below command in docker-machine, I see the messages in
>  TextLinesTopic.
> 
>  docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
>  --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
>  --from-beginning
> 
>  hello kafka streams
> 
>  all streams lead to kafka
> 
>  join kafka summit
> 
>  test1
> 
>  test2
> 
>  test3
> 
>  test4
> 
>  Running above command for WordsWithCountsTopic returns nothing*.*
> 
>  My program runs out of docker machine,

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-14 Thread Michael Noll
I see Jay's point, and I agree with much of it -- notably about being
careful which concepts we do and do not expose, depending on which user
group / user type is affected.  That said, I'm not sure yet whether or not
we should get rid of "Topology" (or a similar term) in the DSL.

For what it's worth, here's how related technologies define/name their
"topologies" and "builders".  Note that, in all cases, it's about
constructing a logical processing plan, which then is being executed/run.

- `Pipeline` (Google Dataflow/Apache Beam)
- To add a source you first instantiate the Source (e.g.
`TextIO.Read.from("gs://some/inputData.txt")`),
  then attach it to your processing plan via `Pipeline#apply()`.
  This setup is a bit different to our DSL because in our DSL the
builder does both, i.e.
  instantiating + auto-attaching to itself.
- To execute the processing plan you call `Pipeline#execute()`.
- `StreamingContext`` (Spark): This setup is similar to our DSL.
- To add a source you call e.g.
`StreamingContext#socketTextStream("localhost", )`.
- To execute the processing plan you call `StreamingContext#execute()`.
- `StreamExecutionEnvironment` (Flink): This setup is similar to our DSL.
- To add a source you call e.g.
`StreamExecutionEnvironment#socketTextStream("localhost", )`.
- To execute the processing plan you call
`StreamExecutionEnvironment#execute()`.
- `Graph`/`Flow` (Akka Streams), as a result of composing Sources (~
`KStreamBuilder.stream()`) and Sinks (~ `KStream#to()`)
  into Flows, which are [Runnable]Graphs.
- You instantiate a Source directly, and then compose the Source with
Sinks to create a RunnableGraph:
  see signature `Source#to[Mat2](sink: Graph[SinkShape[Out], Mat2]):
RunnableGraph[Mat]`.
- To execute the processing plan you call `Flow#run()`.

In our DSL, in comparison, we do:

- `KStreamBuilder` (Kafka Streams API)
- To add a source you call e.g. `KStreamBuilder#stream("input-topic")`.
- To execute the processing plan you create a `KafkaStreams` instance
from `KStreamBuilder`
  (where the builder will instantiate the topology = processing plan to
be executed), and then
  call `KafkaStreams#start()`.  Think of `KafkaStreams` as our runner.

First, I agree with the sentiment that the current name of `KStreamBuilder`
isn't great (which is why we're having this discussion).  Also, that
finding a good name is tricky. ;-)

Second, even though I agree with many of Jay's points I'm not sure whether
I like the `StreamsBuilder` suggestion (i.e. any name that does not include
"topology" or a similar term) that much more.  It still doesn't describe
what that class actually does, and what the difference to `KafkaStreams`
is.  IMHO, the point of `KStreamBuilder` is that it lets you build a
logical plan (what we call "topology"), and `KafkaStreams` is the thing
that executes that plan.  I'm not yet convinced that abstracting these two
points away from the user is a good idea if the argument is that it's
potentially confusing to beginners (a claim which I am not sure is actually
true).

That said, if we rather favor "good-sounding but perhaps less technically
correct names", I'd argue we should not even use something like "Builder".
We could, for example, also pick the following names:

- KafkaStreams as the new name for the builder that creates the logical
plan, with e.g. `KafkaStreams.stream("intput-topic")` and
`KafkaStreams.table("input-topic")`.
- KafkaStreamsRunner as the new name for the executioner of the plan, with
`KafkaStreamsRunner(KafkaStreams).run()`.



On Tue, Mar 14, 2017 at 5:56 AM, Sriram Subramanian 
wrote:

> StreamsBuilder would be my vote.
>
> > On Mar 13, 2017, at 9:42 PM, Jay Kreps  wrote:
> >
> > Hey Matthias,
> >
> > Make sense, I'm more advocating for removing the word topology than any
> > particular new replacement.
> >
> > -Jay
> >
> > On Mon, Mar 13, 2017 at 12:30 PM, Matthias J. Sax  >
> > wrote:
> >
> >> Jay,
> >>
> >> thanks for your feedback
> >>
> >>> What if instead we called it KStreamsBuilder?
> >>
> >> That's the current name and I personally think it's not the best one.
> >> The main reason why I don't like KStreamsBuilder is, that we have the
> >> concepts of KStreams and KTables, and the builder creates both. However,
> >> the name puts he focus on KStream and devalues KTable.
> >>
> >> I understand your argument, and I am personally open the remove the
> >> "Topology" part, and name it "StreamsBuilder". Not sure what others
> >> think about this.
> >>
> >>
> >> About Processor API: I like the idea in general, but I thinks it's out
> >> of scope for this KIP. KIP-120 has the focus on removing leaking
> >> internal APIs and do some cleanup how our API reflects some concepts.
> >>
> >> However, I added your idea to API discussion Wiki page and we take if
> >> from there:
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Discussions
> >>
> >>
> >>
> >> -Matthias
> >>

Re: Kafka Streams question

2017-03-14 Thread Michael Noll
Yes, of course.  You can also re-use any existing JSON and/or YAML library
for helping you with that.

Also, in general, an application that uses the Kafka Streams API/library is
a normal, standard Java application -- you can of course also use any other
Java/Scala/... library for the application's processing needs.

-Michael



On Tue, Mar 14, 2017 at 9:00 AM, BYEONG-GI KIM  wrote:

> Dear Michael Noll,
>
> I have a question; Is it possible converting JSON format to YAML format via
> using Kafka Streams?
>
> Best Regards
>
> KIM
>
> 2017-03-10 11:36 GMT+09:00 BYEONG-GI KIM :
>
> > Thank you very much for the information!
> >
> >
> > 2017-03-09 19:40 GMT+09:00 Michael Noll :
> >
> >> There's actually a demo application that demonstrates the simplest use
> >> case
> >> for Kafka's Streams API:  to read data from an input topic and then
> write
> >> that data as-is to an output topic.
> >>
> >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> streams/src/test/java/io/confluent/examples/streams/Pas
> >> sThroughIntegrationTest.java
> >>
> >> The code above is for Confluent 3.2 and Apache Kafka 0.10.2.
> >>
> >> The demo shows how to (1) write a message from a producer to the input
> >> topic, (2) use a Kafka Streams app to process that data and write the
> >> results back to Kafka, and (3) validating the results with a consumer
> that
> >> reads from the output topic.
> >>
> >> The GitHub project above includes many more such examples, see
> >> https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams.
> >> Again,
> >> this is for Confluent 3.2 and Kafka 0.10.2.  There is a version
> >> compatibility matrix that explains which branches you need to use for
> >> older
> >> versions of Confluent/Kafka as well as for the very latest development
> >> version (aka Kafka's trunk):
> >> https://github.com/confluentinc/examples/tree/3.2.x/kafka-
> >> streams#version-compatibility
> >>
> >> Hope this helps!
> >> Michael
> >>
> >>
> >>
> >>
> >> On Thu, Mar 9, 2017 at 9:59 AM, BYEONG-GI KIM 
> wrote:
> >>
> >> > Hello.
> >> >
> >> > I'm a new who started learning the one of the new Kafka functionality,
> >> aka
> >> > Kafka Stream.
> >> >
> >> > As far as I know, the simplest usage of the Kafka Stream is to do
> >> something
> >> > like parsing, which forward incoming data from a topic to another
> topic,
> >> > with a few changing.
> >> >
> >> > So... Here is what I'd want to do:
> >> >
> >> > 1. Produce a simple message, like 1, 2, 3, 4, 5, ... from a producer
> >> > 2. Let Kafka Stream application consume the message and change the
> >> message
> >> > like [1], [2], [3], ...
> >> > 3. Consume the changed message at a consumer
> >> >
> >> > I've read the documentation,
> >> > https://kafka.apache.org/0102/javadoc/index.html?org/apache/
> >> kafka/connect,
> >> > but it's unclear for me how to implement it.
> >> >
> >> > Especially, I could not understand the the
> >> > line, builder.stream("my-input-topic").mapValues(value ->
> >> > value.length().toString()).to("my-output-topic"). Could someone
> >> explain it
> >> > and how to implement what I've mentioned?
> >> >
> >> > Thanks in advance.
> >> >
> >> > Best regards
> >> >
> >> > KIM
> >> >
> >>
> >
> >
>



-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>


Re: ~20gb of kafka-streams state, unexpected?

2017-03-10 Thread Michael Noll
In addition to what Eno already mentioned here's some quick feedback:

- Only for reference, I'd add that 20GB of state is not necessarily
"massive" in absolute terms.  I have talked to users whose apps manage much
more state than that (1-2 orders of magnitude more).  Whether or not 20 GB
is massive for your use case is a different question of course,
particularly if you expected much less than 20GB. ;-)

- One further option would be to use probabilistic data structures instead
of the default RocksDB or in-memory key-value stores.  I put up a POC that
demonstrates how to peform probabilistic counting with a Count-Min Sketch
backed state store [1].  I think a similar approach would also work for
e.g. Bloomfilters, which might be one potential solution to downsize your
de-duplication problem.

Best,
Michael



[1] https://github.com/confluentinc/examples/pull/100



On Fri, Mar 10, 2017 at 12:22 PM, Eno Thereska 
wrote:

> It’s not necessarily the wrong tool since deduplication is a standard
> scenario, but just setting expectations. If you have enough memory I wonder
> if it would make sense to do it all in-memory with an in-memory store.
> Depends on whether disk or memory space is at a premium.
>
> Thanks
> Eno
>
> > On Mar 10, 2017, at 11:05 AM, Ian Duffy  wrote:
> >
> > Hi Eno,
> >
> > Thanks for the fast response.
> >
> > We are doing a deduplication process here, so yes you are correct the
> keys
> > are normally unique. Sounds like a wrong tool for the job issue on my
> end.
> >
> > Thanks for your input here.
> >
> >
> >
> > On 10 March 2017 at 10:59, Eno Thereska  wrote:
> >
> >> Hi Ian,
> >>
> >> Sounds like you have a total topic size of ~20GB (96 partitions x
> 200mb).
> >> If most keys are unique then group and reduce might not be as effective
> in
> >> grouping/reducing. Can you comment on the key distribution? Are most
> keys
> >> unique? Or do you expect lots of keys to be the same in the topic?
> >>
> >> Thanks
> >> Eno
> >>
> >>
> >>> On Mar 10, 2017, at 9:05 AM, Ian Duffy  wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I'm doing a groupBy and reduce on a kstream which results in a state
> >> store
> >>> being created.
> >>>
> >>> This state store is growing to be massive, its filled up a 20gb drive.
> >> This
> >>> feels very unexpected. Is there some cleanup or flushing process for
> the
> >>> state stores that I'm missing or is such a large size expected?
> >>>
> >>> The topic in question has 96 partitions and the state is about ~200mb
> >>> average for each one.
> >>>
> >>> 175M 1_0
> >>> 266M 1_1
> >>> 164M 1_10
> >>> 177M 1_11
> >>> 142M 1_12
> >>> 271M 1_13
> >>> 158M 1_14
> >>> 280M 1_15
> >>> 286M 1_16
> >>> 181M 1_17
> >>> 185M 1_18
> >>> 187M 1_19
> >>> 281M 1_2
> >>> 278M 1_20
> >>> 188M 1_21
> >>> 262M 1_22
> >>> 166M 1_23
> >>> 177M 1_24
> >>> 268M 1_25
> >>> 264M 1_26
> >>> 147M 1_27
> >>> 179M 1_28
> >>> 276M 1_29
> >>> 177M 1_3
> >>> 157M 1_30
> >>> 137M 1_31
> >>> 247M 1_32
> >>> 275M 1_33
> >>> 169M 1_34
> >>> 267M 1_35
> >>> 283M 1_36
> >>> 171M 1_37
> >>> 166M 1_38
> >>> 277M 1_39
> >>> 160M 1_4
> >>> 273M 1_40
> >>> 278M 1_41
> >>> 279M 1_42
> >>> 170M 1_43
> >>> 139M 1_44
> >>> 272M 1_45
> >>> 179M 1_46
> >>> 283M 1_47
> >>> 263M 1_48
> >>> 267M 1_49
> >>> 181M 1_5
> >>> 282M 1_50
> >>> 166M 1_51
> >>> 161M 1_52
> >>> 176M 1_53
> >>> 152M 1_54
> >>> 172M 1_55
> >>> 148M 1_56
> >>> 268M 1_57
> >>> 144M 1_58
> >>> 177M 1_59
> >>> 271M 1_6
> >>> 279M 1_60
> >>> 266M 1_61
> >>> 194M 1_62
> >>> 177M 1_63
> >>> 267M 1_64
> >>> 177M 1_65
> >>> 271M 1_66
> >>> 175M 1_67
> >>> 168M 1_68
> >>> 140M 1_69
> >>> 175M 1_7
> >>> 173M 1_70
> >>> 179M 1_71
> >>> 178M 1_72
> >>> 166M 1_73
> >>> 180M 1_74
> >>> 177M 1_75
> >>> 276M 1_76
> >>> 177M 1_77
> >>> 162M 1_78
> >>> 266M 1_79
> >>> 194M 1_8
> >>> 158M 1_80
> >>> 187M 1_81
> >>> 162M 1_82
> >>> 163M 1_83
> >>> 177M 1_84
> >>> 286M 1_85
> >>> 165M 1_86
> >>> 171M 1_87
> >>> 162M 1_88
> >>> 179M 1_89
> >>> 145M 1_9
> >>> 166M 1_90
> >>> 190M 1_91
> >>> 159M 1_92
> >>> 284M 1_93
> >>> 172M 1_94
> >>> 149M 1_95
> >>
> >>
>
>


Re: Streams - Got bit by log levels of the record cache

2017-03-10 Thread Michael Noll
I think a related JIRA ticket is
https://issues.apache.org/jira/browse/KAFKA-4829 (see Guozhang's comment
about the ticket's scope).

-Michael


On Thu, Mar 9, 2017 at 6:22 PM, Damian Guy  wrote:

> Hi Nicolas,
>
> Please do file a JIRA.
>
> Many thanks,
> Damian
>
> On Thu, 9 Mar 2017 at 15:54 Nicolas Fouché  wrote:
>
> > Hi,
> >
> > I just wanted to share how we misinterpreted logs from Streams at the
> INFO
> > level. Version 0.10.1.1, I think it's the same for 0.10.2.0.
> >
> > So, we configured `commit.interval.ms` and `cache.max.bytes.buffering`,
> > and
> > we expected to always reach the commit interval before the maximum bytes.
> > It was confirmed by looking at the logs the commit interval actually
> > triggered commits. No logs about the maximum bytes.
> >
> > But then I noticed in our monitoring graphs and by consuming output
> topics
> > that the rate of messages was way too high (thanks to pv[1]).
> >
> > I checked the code of Streams to write a kinda "sequence diagram" to see
> > what was logged and at what level.
> >
> > For `commit.interval.ms`:
> > - StreamThread#maybeCommit does an INFO recalling the configured duration
> > - then StreamThread#commitOne does one INFO per task.
> > - then NamedCache#flush announces the flushes at DEBUG level.
> >
> > For `cache.max.bytes.buffering`:
> > - ThreadCache#maybeEvict does a TRACE (?!?) to announce that the cache is
> > too big
> > - then NamedCache#flush announces the flushes at DEBUG level.
> >
> > So I think it makes the logs not exhaustive enough concerning the writes
> to
> > States + Changelog topics +  Output topics (Processor forwards). Thus
> logs
> > are easily misinterpreted.
> >
> > Should I file a JIRA ?
> >
> > [1] https://linux.die.net/man/1/pv
> >
> > - Nicolas
> >
>


Re: Kafka Streams question

2017-03-09 Thread Michael Noll
There's actually a demo application that demonstrates the simplest use case
for Kafka's Streams API:  to read data from an input topic and then write
that data as-is to an output topic.

https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java

The code above is for Confluent 3.2 and Apache Kafka 0.10.2.

The demo shows how to (1) write a message from a producer to the input
topic, (2) use a Kafka Streams app to process that data and write the
results back to Kafka, and (3) validating the results with a consumer that
reads from the output topic.

The GitHub project above includes many more such examples, see
https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams.  Again,
this is for Confluent 3.2 and Kafka 0.10.2.  There is a version
compatibility matrix that explains which branches you need to use for older
versions of Confluent/Kafka as well as for the very latest development
version (aka Kafka's trunk):
https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams#version-compatibility

Hope this helps!
Michael




On Thu, Mar 9, 2017 at 9:59 AM, BYEONG-GI KIM  wrote:

> Hello.
>
> I'm a new who started learning the one of the new Kafka functionality, aka
> Kafka Stream.
>
> As far as I know, the simplest usage of the Kafka Stream is to do something
> like parsing, which forward incoming data from a topic to another topic,
> with a few changing.
>
> So... Here is what I'd want to do:
>
> 1. Produce a simple message, like 1, 2, 3, 4, 5, ... from a producer
> 2. Let Kafka Stream application consume the message and change the message
> like [1], [2], [3], ...
> 3. Consume the changed message at a consumer
>
> I've read the documentation,
> https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/connect,
> but it's unclear for me how to implement it.
>
> Especially, I could not understand the the
> line, builder.stream("my-input-topic").mapValues(value ->
> value.length().toString()).to("my-output-topic"). Could someone explain it
> and how to implement what I've mentioned?
>
> Thanks in advance.
>
> Best regards
>
> KIM
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-09 Thread Michael Noll
Thanks for the update, Matthias.

+1 to the points 1,2,3,4 you mentioned.

Naming is always a tricky subject, but renaming KStreamBuilder
to StreamsTopologyBuilder looks ok to me (I would have had a slight
preference towards DslTopologyBuilder, but hey.)  The most important aspect
is, IMHO, what you also pointed out:  to make it clear that the current
KStreamBuilder actually builds a topology (though currently the latter is
actually called `TopologyBuilder` currently), and not a `KStream`.





On Wed, Mar 8, 2017 at 11:43 PM, Matthias J. Sax 
wrote:

> Hi,
>
> sorry for not replying earlier and thanks for all your feedback. After
> some more discussions I updated the KIP. The new proposal puts some
> other design considerations into account, that I want to highlight
> shortly. Those considerations, automatically resolve the concerns raised.
>
> First some answers:
>
> > The PAPI processors I use in my KStreams app are all functioning on
> KTable
> > internals.  I wouldn't be able to convert them to process()/transform().
> >
> > What's the harm in permitting both APIs to be used in the same
> application?
>
> It's not about "harm" but about design. We want to switch from a
> "inheritance" to a "composition" pattern.
>
> About the interface idea: using a shared interface would not help to get
> a composition pattern
>
>
> Next I want to give the design considerations leading to the updated KIP:
>
> 1) Using KStreamBuilder in the constructor of KafkaStreams is unnatural.
> KafkaStreams client executes a `Topology` and this execution should be
> independent of the way the topology is "put together", ie, low-level API
> or DSL.
>
> 2) Thus, we don't want to have any changes to KafkaStreams class.
>
> 3) Thus, KStreamBuilder needs to have a method `build()` that returns a
> `Topology` that can be passed into KafakStreams.
>
> 4) Because `KStreamBuilder` should build a `Topology` I suggest to
> rename the new class to `StreamsTopologyBuilder` (the name
> TopologyBuilder would actually be more natural, but would be easily
> confused with old low-level API TopologyBuilder).
>
> Thus, PAPI and DSL can be mixed-and-matched with full power, as
> StreamsTopologyBuilder return the created Topology via #build().
>
> I also removed `final` for both builder classes.
>
>
>
> With regard to the larger scope of the overal API redesign, I also want
> to point to a summary of API issues:
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Discussions
>
> Thus, this KIP is only one building block of a larger improvement
> effort, and we hope to get as much as possible done for 0.11. If you
> have any API improvement ideas, please share them so we can come up with
> an holistic sound design (instead of uncoordinated local improvements
> that might diverge)
>
>
>
> Looking forward to your feedback on this KIP and the other API issues.
>
>
>
> -Matthias
>
>
>
>
> On 2/15/17 7:36 PM, Mathieu Fenniak wrote:
> > On Wed, Feb 15, 2017 at 5:04 PM, Matthias J. Sax 
> > wrote:
> >
> >> - We also removed method #topologyBuilder() from KStreamBuilder because
> >> we think #transform() should provide all functionality you need to
> >> mix-an-match Processor API and DSL. If there is any further concern
> >> about this, please let us know.
> >>
> >
> > Hi Matthias,
> >
> > Yes, I'm sorry I didn't respond sooner, but I still have a lot of
> concerns
> > about this.  You're correct to point out that transform() can be used for
> > some of the output situations I pointed out; albeit it seems somewhat
> > awkward to do so in a "transform" method; what do you do with the retval?
> >
> > The PAPI processors I use in my KStreams app are all functioning on
> KTable
> > internals.  I wouldn't be able to convert them to process()/transform().
> >
> > What's the harm in permitting both APIs to be used in the same
> application?
> >
> > Mathieu
> >
>
>


Re: Can I create user defined stream processing function/api?

2017-03-07 Thread Michael Noll
There's also an end-to-end example for DSL and Processor API integration:
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java

Best,
Michael



On Tue, Mar 7, 2017 at 4:51 PM, LongTian Wang  wrote:

> Really appreciated, Matthias!
> That's what I wanted.
>
> Regards,
> Long Tian
> 
> From: Matthias J. Sax 
> Sent: 07 March 2017 07:48:08
> To: users@kafka.apache.org
> Subject: Re: Can I create user defined stream processing function/api?
>
> Hi,
>
> you can implements custom operator via process(), transform(), and
> transform() values.
>
> Also, if you want to have even more control over the topology, you can
> use low-level Processor API directly instead of DSL.
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#processor-api
>
>
> -Matthias
>
> On 3/6/17 3:33 PM, Wang LongTian wrote:
> > Dear folks,
> >
> > Background: I'm leaning Kafka stream and want to use that in my product
> for real time streaming process with data from various sensors.
> >
> > Question:
> > 1. Can I define my own processing function/api in Kafka stream except
> the predefined functions like groupby(), count() etc.?
> > 2. If I could define my own function, could you please specify a example
> or introduce me a github project for example?
> >
> > Thank u in advance!
> >
> > Regards,
> > Long Tian
> >
>
>


Re: Kafka streams DSL advantage

2017-03-06 Thread Michael Noll
The DSL has some unique features that aren't in the Processor API, such as:

- KStream and KTable abstractions.
- Support for time windows (tumbling windows, hopping windows) and session
windows.  The Processor API only has stream-time based `punctuate()`.
- Record caching, which is slightly better than state store caching for the
Processor API (
http://docs.confluent.io/current/streams/developer-guide.html#memory-management
)

You can re-implement most of these features in the processor API too (after
all, the DSL itself is based on the Processor API), but that's manual DIY
work.

That being said, you can also combine the DSL and the Processor API via the
DSL's `process()` and `transform` / `transformValues` operations.  See [1]
for an example.

Best,
Michael



[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java





On Sun, Mar 5, 2017 at 8:01 PM, Shimi Kiviti  wrote:

> Hi Eveyone,
>
> I have seen a few times (here in the mailing list )were someone wanted to
> use kafka streams DSL for something that wasn't possible to do with the DSL
> and the suggestion was to use the processor API.
>
> I was wondering, except the fluent functional code of the DSL, are there
> any other benefits of using the DSL over the processor API?
>
> thanks,
> Shimi
>


Re: Writing data from kafka-streams to remote database

2017-03-06 Thread Michael Noll
I'd use option 2 (Kafka Connect).

Advantages of #2:

- The code is decoupled from the processing code and easier to refactor in
the future. (same as #4)
- The runtime/uptime/scalability of your Kafka Streams app (processing) is
decoupled from the runtime/uptime/scalability of the data ingestion into
your remote database.

"Remove the need for additional kafka topic." isn't a big win typically --
even though topics aren't free, there still quite cheap. ;-)

YMMV of course. :-)


On Sun, Mar 5, 2017 at 7:55 PM, Shimi Kiviti  wrote:

> Thank Eno,
>
> Yes, I am aware of that. It indeed looks like a very useful feature.
>
> The result of the processing in kafka streams is only a small amount of
> data that is require by our service.
> Currently it make more sense for us to update the remote database were we
> have more data that our application require.
> Also, the data should be available in case of failures. The remote database
> data is replicated. AFAIK although RocksDb changelog is backed by kafka, if
> a node fail, the data will be unavailable until it will be replicated to a
> different node.
>
> On Sun, Mar 5, 2017 at 4:38 PM, Eno Thereska 
> wrote:
>
> > Hi Shimi,
> >
> > Could you tell us more about your scenario? Kafka Streams uses embedded
> > databases (RocksDb) to store it's state, so often you don't need to write
> > anything to an external database and you can query your streams state
> > directly from streams. Have a look at this blog if that matches your
> > scenario: https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/  > blog/unifying-stream-processing-and-interactive-
> queries-in-apache-kafka/>.
> >
> > Cheers
> > Eno
> >
> > > On 5 Mar 2017, at 10:48, Shimi Kiviti  wrote:
> > >
> > > Hi Everyone,
> > >
> > > I was wondering about writing data to remote database.
> > > I see 4 possible options:
> > >
> > >   1. Read from a topic and write to the database.
> > >   2. Use kafka connect
> > >   3. Write from anywhere in kafka streams.
> > >   4. Register a CachedStateStore FlushListener that will send a batch
> of
> > >   records when the store flush the records.
> > >
> > > Advantages of #4:
> > >
> > >   - The code is decoupled from the processing code and easier to
> refactor
> > >   in the future.
> > >   - Remove the need for additional kafka topic.
> > >
> > >
> > > Thanks,
> > >
> > > Shimi
> >
> >
>


Re: Chatty StreamThread commit messages

2017-03-01 Thread Michael Noll
Good point, Steven.  +1 here.

On Wed, Mar 1, 2017 at 8:52 AM, Damian Guy  wrote:

> +1
> On Wed, 1 Mar 2017 at 07:15, Guozhang Wang  wrote:
>
> > Hey Steven,
> >
> > That is a good question, and I think your proposal makes sense. Could you
> > file a JIRA for this change to keep track of it?
> >
> > Guozhang
> >
> > On Tue, Feb 28, 2017 at 1:35 PM, Steven Schlansker <
> > sschlans...@opentable.com> wrote:
> >
> > > Hi everyone, running with Kafka Streams 0.10.2.0, I see this every
> commit
> > > interval:
> > >
> > > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1]
> > o.a.k.s.p.internals.StreamThread
> > > - stream-thread [StreamThread-1] Committing task StreamTask 1_31
> > > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1]
> > o.a.k.s.p.internals.StreamThread
> > > - stream-thread [StreamThread-1] Committing task StreamTask 2_31
> > >
> > > We have ~10 tasks in our topology, 4 topics, and 32 partitions per
> topic.
> > > This means every commit interval we log a few hundred lines of the
> above
> > > which is an order of magnitude chattier than anything else in the log
> > > during normal operations.
> > >
> > > Does anyone else find the chattiness of this message annoying?  I would
> > > personally prefer something more like:
> > >
> > > # existing message is fine at TRACE level for diagnostics
> > > "TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31"
> > > # normal fast case, wrap them all up into one summary line
> > > "INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms"
> > > # some kind of threshold / messaging in case it doesn't complete
> quickly
> > > or logs an exception
> > > "ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in
> > 100ms"
> > >
> > > Thoughts?
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Kafka Streams vs Spark Streaming

2017-02-28 Thread Michael Noll
Sachin,

disabling (change)logging for state stores disables the fault-tolerance of
the state store -- i.e. changes to the state store will not be backed up to
Kafka, regardless of whether the store uses a RocksDB store, an in-memory
store, or something else


> When disabling this in 0.10.2 what does this exactly means.

See above.


> Does this means no longer any rocksdb state store would get created?

No, local state stores will still be created.  By default, the storage
engine is RocksDB, so if you disable changelogging then you will still have
local RocksDB stores (as usual) but those stores will not be backed up to
Kafka behind the scenes.  If, in this situation, you lose a machine that
has local RocksDB stores, then this state data is lost, too.

So there are two different things at play here:

1. Whether you want to enable or disable (change)logging of state store,
and thus to enable/disable fault-tolerant state stores.

2. Which storage engine you want to use for the state stores.  The default
is RocksDB.

If, for (2), you do not want to have RocksDB state stores, you can switch
the storage engine to e.g. the in-memory store.  However, when you do
switch from RocksDB to in-memory then all your state store's data must fit
into memory (obviously), otherwise you'll run OOM.

In summary, you can have either of the following:

a. RocksDB state stores with changelogging enabled (= fault-tolerant
stores).

b. RocksDB state stores with changelogging disabled (= stores are not
fault-tolerant, you may suffer from data loss during e.g. machine failures).

c. In-memory state stores with changelogging enabled (= fault-tolerant
stores). But careful: you may run OOM if the state data does not fit into
the available memory.

d. In-memory state stores with changelogging disabled (= stores are not
fault-tolerant, you may suffer from data loss during e.g. machine
failures). But careful: you may run OOM if the state data does not fit into
the available memory.


Hope this helps,
Michael




On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal  wrote:

> I had a question regarding
> http://docs.confluent.io/3.1.2/streams/developer-guide.
> html#enable-disable-state-store-changelogs
>
> When disabling this in 0.10.2 what does this exactly means.
> Dos this means no longer any rocksdb state store would get created?
>
> On this subject we had started with spark streaming, but we ran into memory
> issues and the hardware we have got is not so fantastic to support spark
> streaming.
>
> So we switched to high level DSL kafka streaming .
>
> I think if your source is kafka queues, kafka streaming is good and simple
> to use. However you need to plan ahead as anticipate the (max) load and
> create adequate partitions based on some key on which aggregations can be
> performed independently.
>
> Then you can run cluster of stream threads (same and multiple machines),
> each processing a partition.
>
> Having said this, we however run into lot of issues of frequent stream
> re-balance, especially when we have multiple instances of rocks db running
> on a single machine.
> Now we don't know if this is some bad VM configuration issue or some
> problem with kafka streams/rocks db integration, we are still working on
> that.
>
> So I would suggest if you partition your data well enough and have single
> streams thread consuming only one partition and not many instances of
> rocksdb created on a single machine, the overall applications runs fine.
> Also make sure not to create big time windows and set a not so long
> retention time, so that state stores size is limited.
>
> We use a sliding 5 minutes window of size 10 minutes and retention of 30
> minutes and see overall performance much better than say 30 minutes sliding
> of size 1 hour and retention of 3 hours.
>
> So to conclude if you can manage rocks db, then kafka streams is good to
> start with, its simple and very intuitive to use.
>
> Again on rocksdb side, is there a way to eliminate that and is
>
> disableLogging
>
> for that?
>
> Thanks
> Sachin
>
>
>
> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll 
> wrote:
>
> > > Also, is it possible to stop the syncing between state stores to
> brokers,
> > if I am fine with failures?
> >
> > Yes, you can disable the syncing (or the "changelog" feature) of state
> > stores:
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#enable-disable-state-store-changelogs
> >
> > > I do have a Spark Cluster, but I am not convince how Spark Streaming
> can
> > do this differently.
> > > Guozhang, could you comment anything regarding Kafka Streams vs Spark
> > Streaming, especially
> > > in terms of aggregations/groupbys/joins implementation 

Re: Kafka Streams vs Spark Streaming

2017-02-27 Thread Michael Noll
> Also, is it possible to stop the syncing between state stores to brokers,
if I am fine with failures?

Yes, you can disable the syncing (or the "changelog" feature) of state
stores:
http://docs.confluent.io/current/streams/developer-guide.html#enable-disable-state-store-changelogs

> I do have a Spark Cluster, but I am not convince how Spark Streaming can
do this differently.
> Guozhang, could you comment anything regarding Kafka Streams vs Spark
Streaming, especially
> in terms of aggregations/groupbys/joins implementation logic?

As you are hinting at yourself, if you want fault-tolerant state, then this
fault tolerance comes at a price (in Kafka Streams, this is achieved by
changelog-ing state stores).  Other tools such as Flink or Spark work in a
similar fashion, there's no free lunch.

One option, which you brought up above, is to disable the fault tolerance
functionality for state by disabling the changelogs of state stores (see
above).  Another option is to leverage Kafka's record caching for Kafka
Streams, which does lower the amount of data that is sent across the
network (from your app's state store changelogs to the Kafka cluster and
vice versa), though you may need to tune some parameters in your situation
because your key space has high cardinality and message volume per key is
relatively low (= you don't benefit as much from record caching as most
other users/use cases).


-Michael




On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li  wrote:

> Hi Guozhang and Kohki,
>
> Thanks for your replies.
>
> I think I know how to deal with partitioning now, but I am still not sure
> how to deal with the traffic between the hidden state store sizes and Kafka
> Brokers (same as Kohki).
>
> I feel like the easiest thing to do is to set a larger commit window, so
> that the state stores are synced to brokers slower than default.
>
> I do have a Spark Cluster, but I am not convince how Spark Streaming can
> do this differently. Guozhang, could you comment anything regarding Kafka
> Streams vs Spark Streaming, especially in terms of
> aggregations/groupbys/joins implementation logic?
>
> Also, is it possible to stop the syncing between state stores to brokers,
> if I am fine with failures?
>
> Thanks
> Tianji
>
>
> On 2017-02-26 23:52 (-0500), Guozhang Wang  wrote:
> > Hello Tianji,
> >
> > As Kohki mentioned, in Streams joins and aggregations are always done
> > pre-partitioned, and hence locally. So there won't be any inter-node
> > communications needed to execute the join / aggregations. Also they can
> be
> > hosted as persistent local state stores so you don't need to keep them in
> > memory. So for example if you partition your data with K1 / K2, then data
> > with the same values in combo (K1, K2) will always goes to the same
> > partition, and hence good for aggregations / joins on either K1, K2, or
> > combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data with
> > the same values of K3 / K4 might still goes to different partitions
> > processed by different Streams instances.
> >
> > So what you want is really to partition based on the "maximum superset"
> of
> > all the involved keys. Note that with the superset of all the keys one
> > thing to watch out is the even distribution of the partitions. If it is
> not
> > evenly distributed, then some instance might become hot points. This can
> be
> > tackled by customizing the "PartitionGrouper" interface in Streams, which
> > indicates which set of partitions will be assigned to each of the tasks
> (by
> > default each one partition from the source topics will form a task, and
> > task is the unit of parallelism in Streams).
> >
> > Hope this helps.
> >
> > Guozhang
> >
> >
> > On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio 
> wrote:
> >
> > > Tianji,
> > > KStream is indeed Append mode as long as I do stateless processing, but
> > > when you do aggregation that is a stateful operation and it turns to
> KTable
> > > and that does Update mode.
> > >
> > > In regard to your aggregation, I believe Kafka's aggregation works for
> a
> > > single partition not over multiple partitions, are you doing 100
> > > different aggregation against record key ? Then you should have a
> single
> > > data object for those 100 values, anyway it sounds like we have similar
> > > problem ..
> > >
> > > -Kohki
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li  wrote:
> > >
> > > > Hi Kohki,
> > > >
> > > > Thanks very much for providing your investigation results. Regarding
> > > > 'append' mode with Kafka Streams, isn't KStream the thing you want?
> > > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for the pointers to the two blogs. I read one of them before
> and
> > > > just had a look at the other one.
> > > >
> > > > What I am hoping to do is below, can you help me decide if Kafka
> Stream
> > > is
> > > > a good fit?
> > > >
> > > > We have a few data sources, and we are hoping to correlate these
> sources,
> > > > and then d

Re: Simple data-driven app design using Kafka

2017-02-23 Thread Michael Noll
Pete,

have you looked at Kafka's Streams API yet?

There are many examples available in the `kafka-streams` folder at
https://github.com/confluentinc/examples.  The simplest example of "Do sth
to a new data record as soon as it arrives" might be
the MapFunctionLambdaExample.  You can create different KStream instances
for different topics and then use a different map() operation for each
KStream, for example.

-Michael




On Thu, Feb 23, 2017 at 4:39 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Pete,
>
> I think this excellent post covers what you are looking for:
> https://www.confluent.io/blog/tutorial-getting-started-with-
> the-new-apache-kafka-0-9-consumer-client/
>
> --Vahid
>
>
>
>
> From:   Peter Figliozzi 
> To: users@kafka.apache.org
> Date:   02/22/2017 07:29 PM
> Subject:Simple data-driven app design using Kafka
>
>
>
> Hello Kafka Users,
>
> I started using Kafka a couple of weeks ago an am very impressed!  I've
> gotten the hang of producing, and now it's time for consuming.  My
> applications (Scala) don't work quite like the examples, but I think it's
> a
> pretty basic architecture:
>
>
>- Suppose you have a several topics: foo, bar, and baz
>- When a new data element arrives in a particular topic, perform the
>topic-specific task with the new data i.e. DoFoo(newfoo)
>- Otherwise, do nothing
>
>
> Can anyone point to an example or even sketch it out here?
>
> Thanks much,
>
> Pete
>
>
>
>
>


Re: KTable send old values API

2017-02-22 Thread Michael Noll
Dmitry,

I think your use case is similar to the one I described in the link below
(discussion in the kafka-dev mailing list):
http://search-hadoop.com/m/uyzND1rVOQ12OJ84U&subj=Re+Streams+TTLCacheStore

Could you take a quick look?

-Michael




On Wed, Feb 22, 2017 at 12:39 AM, Dmitry Minkovsky 
wrote:

> Hi Eno,
>
> Thank you. I don't think I'm advanced enough to imagine a good API. But I
> can elaborate my use-cases further.
>
> So say I have two tables:
>
> KTable left = topology.table(stringSerde,
> stringSerde, topicLeft, topicLeft);
> KTable right = topology.table(stringSerde,
> stringSerde, topicRight, topicRight);
>
> left
>   .leftJoin(right, (l, r) -> asList(l, r))
>   .to(topicView)
>
>
>- I'd like to filter duplicates out of the change stream. I only want
>topicView to receive proper updates.
>
>- I'd like to be able to detect change type easy:
>
>- oldValue == null and newValue != null => create
>   - oldValue != null and newValue == null => delete
>   - oldValue != null and newValue != null => update
>
>   - I'd like to be able to update indices when records are deleted or
>updated. Old values are needed to determine which index keys which
> should
>be updated or removed.
>
>
> I can do all these things now, mostly with groupBy()/reduce(),
> groupBy/aggregate() and transform().
>
>
> Best,
> Dmitry
>
> On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska 
> wrote:
>
> > Hi Dmitry,
> >
> > Could you tell us more on the exact API you'd like? Perhaps if others
> find
> > it useful too we/you can do a KIP.
> >
> > Thanks
> > Eno
> >
> > > On 21 Feb 2017, at 22:01, Dmitry Minkovsky 
> wrote:
> > >
> > > At KAFKA-2984: ktable sends old values when required
> > > , @ymatsuda
> > > writes:
> > >
> > >> NOTE: This is meant to be used by aggregation. But, if there is a use
> > > case like a SQL database trigger, we can add a new KTable method to
> > expose
> > > this.
> > >
> > > Looking through the source it does not seem that this API was ever
> > exposed.
> > > Not finding anything on Google on this subject either. The SQL database
> > > trigger is my exact use case. Enabling change-streaming for some tables
> > > would help simplify my code. Is this possible? Is this scheduled for a
> > > future version?
> > >
> > > Thank you,
> > > Dmitry
> >
> >
>



-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno 
Follow us: Twitter  | Blog



Re: Kafka streams: Getting a state store associated to a processor

2017-02-14 Thread Michael Noll
> By the way - do I understand correctly that when a state store is
persistent, it is logged by default?

Yes.

> So enableLogging(Map) only is a way to provide default configuration to
the default logging?

Yes.  That is, any configs that should be applied to the state store's
changelog topic.

> And, logging can be disabled with .disableLogging(), in which case the
state store is persisted
> on disk using RocksDB, but is not backed by a Kafka topic for recovery in
case of loosing the rocksdb file?

Yes.






On Tue, Feb 14, 2017 at 12:57 AM, Adam Warski  wrote:

> Hello,
>
> > Can you try this out with 0.10.2 branch or current trunk?
>
> With 0.10.2 this works fine! The state-store changelog is created with 3
> partitions if the source topic has 3 partitions.
> I checked with client from https://repository.apache.org/
> content/groups/staging/org/apache/kafka/kafka_2.11/0.10.2.0/ <
> https://repository.apache.org/content/groups/staging/
> org/apache/kafka/kafka_2.11/0.10.2.0/> & server from
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/ <
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/>
>
> Also, using 0.10.2 made these warnings go away:
> 2017-02-13 19:57:12 WARN  ConsumerConfig:188 - The configuration
> 'replication.factor' was supplied but isn't a known config.
> 2017-02-13 19:57:12 WARN  ConsumerConfig:188 - The configuration '
> windowstore.changelog.additional.retention.ms' was supplied but isn't a
> known config.
>
> Thanks!
>
> >> Or the auto-created topic should use the partition count of the
> incoming streams (though I’m not sure that’s always possible - a store
> might be used multiple times).
> >
> > This should happen. If not, there is a bug. Can you reliable reproduce
> > this issue?
>
> Yes, I’m was testing using a fresh 0.10.1.1 download, one local broker,
> default configuration.
>
> >> Finally, is there a way to specify how many partitions should
> auto-created topics have when creating a state store? I tried with:
> >
> > The number of partitions are computed based on the number of tasks, that
> > depend on the number of input partitions. Thus, you cannot configure it.
> > Because the topic must have a specific number of partitions go guarantee
> > correct results, there is nothing to "tune" and thus you cannot
> > configure anything.
>
> Ah, makes sense.
>
> By the way - do I understand correctly that when a state store is
> persistent, it is logged by default? So enableLogging(Map) only is a way to
> provide default configuration to the default logging?
>
> And, logging can be disabled with .disableLogging(), in which case the
> state store is persisted on disk using RocksDB, but is not backed by a
> Kafka topic for recovery in case of loosing the rocksdb file?
>
> Thanks again,
> Adam
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski 
> http://www.softwaremill.com 
> http://www.warski.org 
>


Re: Kafka streams: Getting a state store associated to a processor

2017-02-13 Thread Michael Noll
Adam,

also a FYI: The upcoming 0.10.2 version of the Streams API will be
backwards compatible with 0.10.1 clusters, so you can keep your brokers on
0.10.1.1 and still use the latest Streams API version (including the one
from trunk, as Matthias mentioned).

-Michael



On Mon, Feb 13, 2017 at 1:04 PM, Matthias J. Sax 
wrote:

> Can you try this out with 0.10.2 branch or current trunk?
>
> We put some fixed like you suggested already. Would be nice to get
> feedback if those fixed resolve the issue for you.
>
> Some more comments inline.
>
> -Matthias
>
> On 2/13/17 12:27 PM, Adam Warski wrote:
> > Following this answer, I checked that the auto-created
> "app1-store1-changelog” topic had 1 partition - which caused the problem.
> > Creating this topic upfront with 3 partitions (which matches the stream
> source partition count) fixes the problem.
>
> Makes sense -- however, I am wondering why manually creating the topic
> is required. After you removed all data from /tmp/[zk|logs|streams] the
> topic should be deleted and thus should get created with 3 partitions...
>
> >
> > However, I think this should be handled somehow differently … maybe the
> exception should report the partition count mismatch?
>
> Should happen for 0.10.2
>
> > Or the auto-created topic should use the partition count of the incoming
> streams (though I’m not sure that’s always possible - a store might be used
> multiple times).
>
> This should happen. If not, there is a bug. Can you reliable reproduce
> this issue?
>
> >
> > Finally, is there a way to specify how many partitions should
> auto-created topics have when creating a state store? I tried with:
>
> The number of partitions are computed based on the number of tasks, that
> depend on the number of input partitions. Thus, you cannot configure it.
> Because the topic must have a specific number of partitions go guarantee
> correct results, there is nothing to "tune" and thus you cannot
> configure anything.
>
> >
> > Map storeCfg = new HashMap<>();
> > storeCfg.put("num.partitions", "3");
> >
> > StateStoreSupplier testStore = Stores.create("store1")
> > .withStringKeys()
> > .withStringValues()
> > .persistent()
> > .enableLogging(storeCfg)
> > .build();
> >
> > but that didn’t help.
> >
> > Thanks,
> > Adam
> >
> >> On 13 Feb 2017, at 20:57, Adam Warski  wrote:
> >>
> >>
> >>> If you increase the number of partitions in the topic "topic1" after
> the
> >>> state store is created, you'd need to manually increase the number of
> >>> partitions in the "app1-store1-changelog" topic as well.  Or remove the
> >>> topic and let KS recreate it next run.  But, either way, hopefully you
> >>> don't need the data in it, 'cause it won't match the partitioning of
> the
> >>> input topic. :-)
> >>
> >> I’m running these test on “clean” Kafka - only “topic1” is manually
> created (with 1 or 3 partitions).
> >> Removing existing data was the first thing I did after getting the
> exception :)
> >> (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs,
> /tmp/kafka-streams)
> >>
> >> Thanks,
> >> Adam
> >>
> >>> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski  > wrote:
> >>>
>  Hello,
> 
>  I have a simple example (or so it would seem) of a stream processor
> which
>  uses a persistent state store. Testing on one local Kafka (0.10.1.1)
> node,
>  this starts up without problems for a topic with 1 partition.
> However, if I
>  create a topic with 3 partitions I’m getting the following exception
>  shortly after the init() method of the Processor is called (init
> completes
>  without problems):
> 
>  2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
>  [StreamThread-1] Failed to create an active task 0_1:
>  org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
>  store1's change log (app1-store1-changelog) does not contain
> partition 1
>  at org.apache.kafka.streams.processor.internals.
>  ProcessorStateManager.register(ProcessorStateManager.java:185)
>  at org.apache.kafka.streams.processor.internals.
>  ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>  at org.apache.kafka.streams.state.internals.RocksDBStore.
>  init(RocksDBStore.java:169)
>  at org.apache.kafka.streams.state.internals.
>  MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>  at org.apache.kafka.streams.processor.internals.AbstractTask.
>  initializeStateStores(AbstractTask.java:81)
>  at org.apache.kafka.streams.processor.internals.
>  StreamTask.(StreamTask.java:119)
> 
>  The code is essentially:
> 
>  StateStoreSupplier testStore = Stores.create("store1")
>  .withStringKeys()
>  .withStringValues()
>  .persistent()
>  .build();
> 
>  TopologyBuilder builder = new TopologyBuilder();
> 
>

Re: KIP-121 [Discuss]: Add KStream peek method

2017-02-07 Thread Michael Noll
Many thanks for the KIP and the PR, Steven!

My opinion, too, is that we should consider including this.

One thing that I would like to see clarified is the difference between the
proposed peek() and existing functions map() and foreach(), for instance.
My understanding (see also the Java 8 links below) is that:

- Like `map`, `peek` will return a KStream.  This also means that, unlike
`foreach`, `peek` is not a terminal operation.
- The main purpose of `peek` is, similar to `foreach`, the *side effects*
(such as the metrics counter example in the KIP) -- and, on a related note,
also to express your *intent* to achieve such side effects in the first
place (which is similar to when to use `foreach` rather than `map`); and
typically you should not (must not?) modify the underlying stream itself
(unlike `map`, which is supposed to do exactly that).

For reference, here are the descriptions of peek, map, foreach in Java 8.
I could have also included links to StackOverflow questions where people
were confused about when (not) to use peek. ;-)

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#peek-java.util.function.Consumer-
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#map-java.util.function.Function-
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEach-java.util.function.Consumer-

Best wishes,
Michael






On Tue, Feb 7, 2017 at 10:37 AM, Damian Guy  wrote:

> Hi Steven,
> Thanks for the KIP. I think this is a worthy addition to the API.
>
> Thanks,
> Damian
>
> On Tue, 7 Feb 2017 at 09:30 Eno Thereska  wrote:
>
> > Hi,
> >
> > I like the proposal, thank you. I have found it frustrating myself not to
> > be able to understand simple things, like how many records have been
> > currently processed. The peek method would allow those kinds of
> diagnostics
> > and debugging.
> >
> > Gwen, it is possible to do this with the existing functionality like map,
> > but you'd have to fake the map method. Also, it is not great using map
> for
> > things it was not intended for. Having an explicit peek makes it clearer
> in
> > my opinion.
> >
> > Thanks
> > Eno
> >
> > > On 7 Feb 2017, at 03:20, Gwen Shapira  wrote:
> > >
> > > I've read the wiki and am unclear about the proposal. Can you provide
> > > something like a Javadoc for peek()? What would this method do?
> > >
> > > Also, forgive me if I'm missing an important point here, but can't I
> > > put the println statement in a map()?
> > >
> > > On Mon, Feb 6, 2017 at 5:48 PM, Matthias J. Sax  >
> > wrote:
> > >> Steven,
> > >>
> > >> Thanks for your KIP. I move this discussion to dev mailing list --
> KIPs
> > >> need to be discussed there (and can be cc'ed to user list).
> > >>
> > >> Can you also add the KIP to the table "KIPs under discussion":
> > >>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion
> > >>
> > >>
> > >> Thanks.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 2/6/17 3:35 PM, Steven Schlansker wrote:
> > >>> Hello users@kafka,
> > >>>
> > >>> I would like to propose a small KIP on the Streams framework
> > >>> that simply adds a KStream#peek implementation.
> > >>>
> > >>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 121%3A+Add+KStream+peek+method
> > >>> https://issues.apache.org/jira/browse/KAFKA-4720
> > >>> https://github.com/apache/kafka/pull/2493
> > >>>
> > >>> Please consider my contribution and hopefully you all like it and
> > agree that it should be merged into 0.10.3 :)
> > >>> If not, be gentle, this is my first KIP!
> > >>>
> > >>> Happy Monday,
> > >>> Steven
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 <(650)%20450-2760> | @gwenshap
> > > Follow us: Twitter | blog
> >
> >
>


Re: Kafka docs for current trunk

2017-01-31 Thread Michael Noll
Thanks for bringing this up, Matthias.

+1

On Wed, Feb 1, 2017 at 8:15 AM, Gwen Shapira  wrote:

> +1
>
> On Tue, Jan 31, 2017 at 5:57 PM, Matthias J. Sax 
> wrote:
> > Hi,
> >
> > I want to collect feedback about the idea to publish docs for current
> > trunk version of Apache Kafka.
> >
> > Currently, docs are only published for official release. Other projects
> > also have docs for current SNAPSHOT version. So the question rises, if
> > this would be helpful for Kafka community, too.
> >
> > The idea would be, to update SNAPSHOT docs (web page and JavaDocs) on a
> > daily basis based on trunk (of course, fully automated).
> >
> >
> > Looking forward to your feedback.
> >
> >
> > -Matthias
> >
> >
>
>


Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Michael Noll
ons should be updated accordingly. For example,
>>>>>>>>
>>>>>>> 1)
>>>>>
>>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>>>>>>> always materialize the KTable unless its state store name is set to
>>>>>>>>
>>>>>>> null;
>>>>>
>>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence
>>>>>>>>
>>>>>>> it
>>>>>
>>>>>> also have a state store name; 3) KTable.join requires the joining
>>>>>>>> table
>>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>>> be materialized. And today we do not actually have a mechanism to
>>>>>>>>
>>>>>>> enforce
>>>>>
>>>>>> that, but will only throw an exception at runtime if it is not (e.g.
>>>>>>>> if
>>>>>>>>
>>>>>>> you
>>>>>>>
>>>>>>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>>>>>>
>>>>>>>> I'd make an extended proposal just to kick off the discussion here:
>>>>>>>>
>>>>>>> let's
>>>>>
>>>>>> remove all the state store params in other KTable functions, and if in
>>>>>>>>
>>>>>>> some
>>>>>>>
>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted from
>>>>>>>>
>>>>>>> KXX.agg)
>>>>>
>>>>>> and users do not call materialize(), then we treat it as "users are
>>>>>>>> not
>>>>>>>> interested in querying it at all" and hence use an internal name
>>>>>>>>
>>>>>>> generated
>>>>>>>
>>>>>>>> for the materialized KTable; i.e. although it is materialized the
>>>>>>>> state
>>>>>>>> store is not exposed to users. And if users call materialize()
>>>>>>>>
>>>>>>> afterwards
>>>>>
>>>>>> but we have already decided to materialize it, we can replace the
>>>>>>>>
>>>>>>> internal
>>>>>>>
>>>>>>>> name with the user's provided names. Then from a user's point-view,
>>>>>>>> if
>>>>>>>>
>>>>>>> they
>>>>>>>
>>>>>>>> ever want to query a KTable, they have to call materialize() with a
>>>>>>>>
>>>>>>> given
>>>>>
>>>>>> state store name. This approach has one awkwardness though, that
>>>>>>>> serdes
>>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>> state store names param are not separated and could be overlapped
>>>>>>>> (see
>>>>>>>> detailed comment #2 below).
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. This step does not need to be included in this KIP, but just as a
>>>>>>>> reference / future work: as we have discussed before, we may enforce
>>>>>>>> materialize KTable.join resulted KTables as well in the future. If
>>>>>>>> we
>>>>>>>>
>>>>>>> do
>>>>>
>>>>>> that, then:
>>>>>>>>
>>>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>>>> b) KTable.agg requires the aggregating KTable to always be
>>>>>>>> materialized
>>>>>>>> (otherwise we would not know the old value);
>>>>>>>> c) KTable.join resulted KTables are always materialized, and so are
>>>>>>>> the
>>>>>>>> joining KTables to always be materialized.
>>>>>>>> d) KTable.filter/mapValues resulted KTables materialization depend
>>>>>>>> on
>>>>>>>>
>>>>>>> its
>>>>>
>>>

Re: Streams: Global state & topic multiplication questions

2017-01-20 Thread Michael Noll
As Eno said I'd use the interactive queries API for Q2.

Demo apps:
-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java
-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java

Further docs:
http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

(FYI: We have begun moving the info in these docs to the Apache Kafka docs,
too, but it will take a while.)

-Michael




On Thu, Jan 19, 2017 at 5:23 PM, Eno Thereska 
wrote:

> For Q2: one way to export the state on demand would be to use the
> Interactive Queries API (https://www.confluent.io/blog/unifying-stream-
> processing-and-interactive-queries-in-apache-kafka/ <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-
> queries-in-apache-kafka/>). That can be seen as a current, materialized
> view of the state. There is some example code in the blog.
>
> Eno
>
>
> > On 19 Jan 2017, at 16:11, Peter Kopias  wrote:
> >
> > Q1: Thank you, the branch() is what I'm looking for, I just missed it
> > somehow.
> >
> > Q2:
> >   I receive something like "imageid,x,y" as key, and a color as value. I
> > aggregate this to something like average color for example.
> >  So technically I do not have images, I have colored pixels with 3
> > dimensions one being the image...
> >  And then my newcomer user wants to join the fun, so we'd need to serve
> > him an image with the latest current state (all pixels of imageid=x), and
> > of course everything that comes later (updates on the output of the
> > aggregate, but filtered to that imageid).
> >
> > Problem is that how to get the stream api node to "export" part of it's
> > current state on demand. (All imageid=x keys with values).
> >
> > There could be a "request topic" that I could join together with the
> > aggregated ktable maybe?
> >
> > Other problem is following the updates without getting the details of
> > other images (99% of records are not interesting for the specific user).
> >
> > Thanks,
> >
> > Peter
> >
> >
> >
> > On Thu, Jan 19, 2017 at 4:55 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Peter,
> >>
> >> About Q1: The DSL has the "branch" API, where one stream is branched to
> >> several streams, based on a predicate. I think that could help.
> >>
> >> About Q2: I'm not entirely sure I understand the problem space. What is
> >> the definition of a "full image"?
> >>
> >> Thanks
> >> Eno
> >>> On 19 Jan 2017, at 12:07, Peter Kopias  wrote:
> >>>
> >>> Greetings Everyone,
> >>>
> >>> I'm just getting into the kafka world with a sample project, and I've
> got
> >>> two conceptional issues, you might have a trivial answer already at
> hand
> >> to.
> >>>
> >>> Scenario: multiuser painting webapp, with N user working on M images
> >>> simultaneously.   The "brush" events go to one single kafka topic, in a
> >>> format: imageid,x,y -> brushevent  , that I aggregate to imageid,x,y
> >>>
> >>> Q1:
> >>> It would be nice to separate the stream to M output topics, so that
> would
> >>> work nice as "partitioning", and also we could just subscribe to update
> >>> events of a specific image maybe. How can I fan out the records to
> >>> different (maybe not yet existing) topics by using DSL?
> >>>
> >>> Is that a good idea? (If I can solve every processing in a common
> >>> processing graph that would be the best, but then I'd need some high
> >>> performance solution of filtering out the noise, as the subscribers are
> >>> only interested in a very small subset of the soup.)
> >>>
> >>> Q2:
> >>> - When a new user comes, I'd like give him the latest full image?
> >>> (I could do a "fullimages" output topic, but then also comes the
> problem
> >>> of serious overhead on each incoming update, and also the newcomer
> should
> >>> somehow only get the image he's interested in, not read all the images,
> >> and
> >>> ignore the others.)
> >>>
> >>> I know I'm still new to this, but I'd like to learn the best practices
> >> you
> >>> might already tried.
> >>>
> >>> Thank you,
> >>>
> >>> Peter
> >>
> >>
>
>


Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Michael Noll
Nicolas,

here's some information I shared on StackOverflow (perhaps a bit outdated
by now, was back in Aug 2016) about how you can add a state store when
using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580

-Michael




On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché  wrote:

> The reason I would not use `KStream.transform()` is that I want to call
> `ProcessorContext.forward()` several times, to different children. These
> children are sinks.
> My use case: I need to route my beacons to different topics. Right now, I
> use a series of `KStream.branch()` calls [1]. But would it be more
> "elegant" to be able to add 5 sinks to a topology, and forward my records
> to them in a custom processor ?
>
> Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> have
> to give a parent processor. But the parent processor was generated by a
> high-level topologies. And names of processors created by `KStreamBuilder`
> are not accessible. (unless by inspecting the topology nodes I guess)
>
> [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
>
> Thanks.
> Nicolas
>
>
> 2017-01-18 15:56 GMT+01:00 Michael Noll :
>
> > Nicolas,
> >
> > if I understand your question correctly you'd like to add further
> > operations after having called `KStream#process()`, which -- as you
> report
> > -- doesn't work because `process()` returns void.
> >
> > If that's indeed the case, +1 to Damian's suggest to use
> > `KStream.transform()` instead of `KStream.process()`.
> >
> > -Michael
> >
> >
> >
> >
> > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy 
> wrote:
> >
> > > You could possibly also use KStream.transform(...)
> > >
> > > On Wed, 18 Jan 2017 at 14:22 Damian Guy  wrote:
> > >
> > > > Hi Nicolas,
> > > >
> > > > Good question! I'm not sure why it is a terminal operation, maybe one
> > of
> > > > the original authors can chip in. However, you could probably work
> > around
> > > > it by using TopologyBuilder.addProcessor(...) rather then
> > > KStream.process
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché 
> > wrote:
> > > >
> > > > Hi,
> > > >
> > > > as far as I understand, calling `KStream.process` prevents the
> > developer
> > > > from adding further operations to a `KStreamBuilder` [1], because its
> > > > return type is `void`. Good.
> > > >
> > > > But it also prevents the developer from adding operations to its
> > > superclass
> > > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent
> of
> > > > this sink would be the name of the Processor that is created by
> > > > `KStream.process`. Is there any reason why this method does not
> return
> > > the
> > > > processor name [2] ? Is it because it would be a bad idea continuing
> > > > building my topology with the low-level API ?
> > > >
> > > > [1]
> > > >
> > > > https://github.com/confluentinc/examples/blob/3.
> > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > MixAndMatchLambdaIntegrationTest.java%23L56
> > > > [2]
> > > >
> > > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > > kstream/internals/KStreamImpl.java#L391
> > > >
> > > >
> > > > Thanks.
> > > > Nicolas.
> > > >
> > > >
> > >
> >
>


Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Michael Noll
Nicolas,

if I understand your question correctly you'd like to add further
operations after having called `KStream#process()`, which -- as you report
-- doesn't work because `process()` returns void.

If that's indeed the case, +1 to Damian's suggest to use
`KStream.transform()` instead of `KStream.process()`.

-Michael




On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy  wrote:

> You could possibly also use KStream.transform(...)
>
> On Wed, 18 Jan 2017 at 14:22 Damian Guy  wrote:
>
> > Hi Nicolas,
> >
> > Good question! I'm not sure why it is a terminal operation, maybe one of
> > the original authors can chip in. However, you could probably work around
> > it by using TopologyBuilder.addProcessor(...) rather then
> KStream.process
> >
> > Thanks,
> > Damian
> >
> > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché  wrote:
> >
> > Hi,
> >
> > as far as I understand, calling `KStream.process` prevents the developer
> > from adding further operations to a `KStreamBuilder` [1], because its
> > return type is `void`. Good.
> >
> > But it also prevents the developer from adding operations to its
> superclass
> > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > this sink would be the name of the Processor that is created by
> > `KStream.process`. Is there any reason why this method does not return
> the
> > processor name [2] ? Is it because it would be a bad idea continuing
> > building my topology with the low-level API ?
> >
> > [1]
> >
> > https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> MixAndMatchLambdaIntegrationTest.java%23L56
> > [2]
> >
> > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamImpl.java#L391
> >
> >
> > Thanks.
> > Nicolas.
> >
> >
>


Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-17 Thread Michael Noll
Thanks for sharing back your findings/code, Nicolas!

-Michael


On Mon, Jan 16, 2017 at 11:15 PM, Nicolas Fouché  wrote:

> If anyone is interested, here is my custom timestamp extractor:
> https://gist.github.com/nfo/54d5830720e163d2e7e848b6e4baac20 .
>
> 2017-01-16 15:52 GMT+01:00 Nicolas Fouché :
>
> > Hi Michael,
> >
> > got it. I understand that it would be less error-prone to generate the
> > final "altered" timestamp on the Producer side, instead of trying to
> > compute it each time the record is consumed.
> >
> > Thanks.
> > Nicolas.
> >
> > 2017-01-16 10:03 GMT+01:00 Michael Noll :
> >
> >> Nicolas,
> >>
> >> quick feedback on timestamps:
> >>
> >> > In our system, clients send data to an HTTP API. This API produces the
> >> > records in Kafka. I can't rely on the clock of the clients sending the
> >> > original data, (so the records' timestamps are set by the servers
> >> ingesting
> >> > the records in Kafka), but I can rely on a time difference. The client
> >> only
> >> > gives information about the time spent since the first version of the
> >> > record was sent. Via a custom timestamp extractor, I just need to
> >> subtract
> >> > the time spent to the record's timestamp to ensure that it will fall
> in
> >> > same window.
> >>
> >> Alternatively, you can also let the HTTP API handle the timestamp
> >> calculations, and then embed the "final" timestamp in the message
> payload
> >> (like the messave value).  Then, in your downstream application, you'd
> >> define a custom timestamp extractor that returns this embedded
> timestamp.
> >>
> >> One advantage of the approach I outlined above is that other consumers
> of
> >> the same data (who may or may not be aware of how you need to compute a
> >> timestamp diff to get the "real" timestamp) can simply re-use the
> >> timestamp
> >> embedded in the payload without having to know/worry about the custom
> >> calculation.  It might also be easier for Ops personnel to have access
> to
> >> a
> >> ready-to-use timestamp in case they need to debug or troubleshoot.
> >>
> >> -Michael
> >>
> >>
> >>
> >>
> >> On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché 
> >> wrote:
> >>
> >> > Hi Eno,
> >> >
> >> > 2. Well, records could arrive out of order. But it should happen
> rarely,
> >> > and it's no big deal anyway. So let's forget about the version number
> >> if it
> >> > makes things easier !
> >> >
> >> > 3. I completely missed out on KTable aggregations. Thanks a lot for
> the
> >> > pointer, that opens new perspectives.
> >> >
> >> > ... a few hours pass ...
> >> >
> >> > Ok, in my case, since my input is an infinite stream of new records, I
> >> > would have to "window" my KTables, right ?
> >> > With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
> >> > records, and even use the reducer function to compare the version
> >> numbers.
> >> > Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder`
> >> and
> >> > `substractor` mechanisms [1].
> >> >
> >> > The last problem is about the record timestamp. If I work on a
> one-hour
> >> > window, and records are sent between let's say 00:59 and 01:01, they
> >> would
> >> > live in two different KTables and this would create duplicates.
> >> > To deal with this, I could mess with the records timestamps, so each
> new
> >> > record version is considered by Kafka Streams having the same
> timestamp
> >> > than the first version seen by the producer.
> >> > Here is my idea:
> >> > In our system, clients send data to an HTTP API. This API produces the
> >> > records in Kafka. I can't rely on the clock of the clients sending the
> >> > original data, (so the records' timestamps are set by the servers
> >> ingesting
> >> > the records in Kafka), but I can rely on a time difference. The client
> >> only
> >> > gives information about the time spent since the first version of the
> >> > record was sent. Via a custom timestamp extractor, I just need to
> >> subtract
> >> > the time spent 

Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-16 Thread Michael Noll
Nicolas,

quick feedback on timestamps:

> In our system, clients send data to an HTTP API. This API produces the
> records in Kafka. I can't rely on the clock of the clients sending the
> original data, (so the records' timestamps are set by the servers
ingesting
> the records in Kafka), but I can rely on a time difference. The client
only
> gives information about the time spent since the first version of the
> record was sent. Via a custom timestamp extractor, I just need to subtract
> the time spent to the record's timestamp to ensure that it will fall in
> same window.

Alternatively, you can also let the HTTP API handle the timestamp
calculations, and then embed the "final" timestamp in the message payload
(like the messave value).  Then, in your downstream application, you'd
define a custom timestamp extractor that returns this embedded timestamp.

One advantage of the approach I outlined above is that other consumers of
the same data (who may or may not be aware of how you need to compute a
timestamp diff to get the "real" timestamp) can simply re-use the timestamp
embedded in the payload without having to know/worry about the custom
calculation.  It might also be easier for Ops personnel to have access to a
ready-to-use timestamp in case they need to debug or troubleshoot.

-Michael




On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché  wrote:

> Hi Eno,
>
> 2. Well, records could arrive out of order. But it should happen rarely,
> and it's no big deal anyway. So let's forget about the version number if it
> makes things easier !
>
> 3. I completely missed out on KTable aggregations. Thanks a lot for the
> pointer, that opens new perspectives.
>
> ... a few hours pass ...
>
> Ok, in my case, since my input is an infinite stream of new records, I
> would have to "window" my KTables, right ?
> With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
> records, and even use the reducer function to compare the version numbers.
> Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder` and
> `substractor` mechanisms [1].
>
> The last problem is about the record timestamp. If I work on a one-hour
> window, and records are sent between let's say 00:59 and 01:01, they would
> live in two different KTables and this would create duplicates.
> To deal with this, I could mess with the records timestamps, so each new
> record version is considered by Kafka Streams having the same timestamp
> than the first version seen by the producer.
> Here is my idea:
> In our system, clients send data to an HTTP API. This API produces the
> records in Kafka. I can't rely on the clock of the clients sending the
> original data, (so the records' timestamps are set by the servers ingesting
> the records in Kafka), but I can rely on a time difference. The client only
> gives information about the time spent since the first version of the
> record was sent. Via a custom timestamp extractor, I just need to subtract
> the time spent to the record's timestamp to ensure that it will fall in
> same window.
> Long text, small code:
> https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do you
> think ?
>
> About the windowed KTables in the first step, I guess I should avoid making
> them too long, since they store the whole records. We usually aggregate
> with windows size from 1 hour to 1 month. I should compute all the
> aggregates covering more than 1 hour from the 1-hour aggregates, right ?
>
> [1]
> http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/
> kafka/streams/kstream/KGroupedTable.html#aggregate(
> org.apache.kafka.streams.kstream.Initializer,%20org.
> apache.kafka.streams.kstream.Aggregator,%20org.apache.
> kafka.streams.kstream.Aggregator,%20org.apache.kafka.common.serialization.
> Serde,%20java.lang.String)
>
> Thanks (a lot).
> Nicolas
>
>
> 2017-01-13 17:32 GMT+01:00 Eno Thereska :
>
> > Hi Nicolas,
> >
> > There is a lot here, so let's try to split the concerns around some
> themes:
> >
> > 1. The Processor API is flexible and can definitely do what you want, but
> > as you mentioned, at the cost of you having to manually craft the code.
> > 2. Why are the versions used? I sense there is concern about records
> > arriving out of order so the versions give each record with the same ID
> an
> > order. Is that correct?
> > 3. If you didn't have the version and the count requirement I'd say using
> > a KTable to interpret the stream and then aggregating on that would be
> > sufficient. There might be a way to do that with a mixture of the DSL and
> > the processor API.
> >
> > Another alternative might be to use the Interactive Query APIs (
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-
> > queries-in-apache-kafka/  > processing-and-interactive-queries-in-apache-kafka/>) to first get all
> > your data in KTables and then query it periodically (you can decide on
> the
> > frequency manually)

Re: [ANNOUNCE] New committer: Grant Henke

2017-01-16 Thread Michael Noll
My congratulations, Grant -- more work's awaiting you then. ;-)

Best wishes,
Michael


On Fri, Jan 13, 2017 at 2:50 PM, Jeff Holoman  wrote:

> Well done Grant!  Congrats!
>
> On Thu, Jan 12, 2017 at 1:13 PM, Joel Koshy  wrote:
>
> > Hey Grant - congrats!
> >
> > On Thu, Jan 12, 2017 at 10:00 AM, Neha Narkhede 
> wrote:
> >
> > > Congratulations, Grant. Well deserved!
> > >
> > > On Thu, Jan 12, 2017 at 7:51 AM Grant Henke 
> wrote:
> > >
> > > > Thanks everyone!
> > > >
> > > > On Thu, Jan 12, 2017 at 2:58 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Congratulations!
> > > > >
> > > > > On Thu, 12 Jan 2017 at 03:35 Jun Rao  wrote:
> > > > >
> > > > > > Grant,
> > > > > >
> > > > > > Thanks for all your contribution! Congratulations!
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Jan 11, 2017 at 2:51 PM, Gwen Shapira  >
> > > > wrote:
> > > > > >
> > > > > > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > > > > > committer and we are pleased to announce that he has accepted!
> > > > > > >
> > > > > > > Grant contributed 88 patches, 90 code reviews, countless great
> > > > > > > comments on discussions, a much-needed cleanup to our protocol
> > and
> > > > the
> > > > > > > on-going and critical work on the Admin protocol. Throughout
> > this,
> > > he
> > > > > > > displayed great technical judgment, high-quality work and
> > > willingness
> > > > > > > to contribute where needed to make Apache Kafka awesome.
> > > > > > >
> > > > > > > Thank you for your contributions, Grant :)
> > > > > > >
> > > > > > > --
> > > > > > > Gwen Shapira
> > > > > > > Product Manager | Confluent
> > > > > > > 650.450.2760 <(650)%20450-2760> <(650)%20450-2760> | @gwenshap
> > > > > > > Follow us: Twitter | blog
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Grant Henke
> > > > Software Engineer | Cloudera
> > > > gr...@cloudera.com | twitter.com/gchenke |
> linkedin.com/in/granthenke
> > > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>


Re: kafka streams consumer partition assignment is uneven

2017-01-09 Thread Michael Noll
What does the processing topology of your Kafka Streams application look
like, and what's the exact topic and partition configuration?  You say you
have 12 partitions in your cluster, presumably across 7 topics -- that
means that most topics have just a single partition.  Depending on your
topology (e.g. if you have defined that single-partition topics A, B, C
must be joined), Kafka Streams is forced to let one of your three Streams
nodes process "more" topics/partitions than the other two nodes.

-Michael



On Mon, Jan 9, 2017 at 6:52 PM, Ara Ebrahimi 
wrote:

> Hi,
>
> I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3
> kafka streams nodes. Each is configured to have 4 streaming threads. My
> topology is quite complex and I have 7 topics and lots of joins and states.
>
> What I have noticed is that each of the 3 kafka streams nodes gets
> configured to process variables number of partitions of a topic. One node
> is assigned to process 2 partitions of topic a and another one gets
> assigned 5. Hence I end up with nonuniform throughput across these nodes.
> One node ends up processing more data than the other.
>
> What’s going on? How can I make sure partitions assignment to kafka
> streams nodes is uniform?
>
> On a similar topic, is there a way to make sure partition assignment to
> disks across kafka brokers is also uniform? Even if I use a round-robin one
> to pin partitions to broker, but there doesn’t seem to be a way to
> uniformly pin partitions to disks. Or maybe I’m missing something here? I
> end up with 2 partitions of topic a on disk 1 and 3 partitions of topic a
> on disk 2. It’s a bit variable. Not totally random, but it’s not uniformly
> distributed either.
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: Kafka Logo as HighRes or Vectorgraphics

2016-12-02 Thread Michael Noll
Jan,

Here's vector files for the logo. One of our teammates went ahead and
helped resized it to fit nicely into a 2x4m board with 15cm of margin all
around.

Note: I was told to kindly remind you (and other readers of this) to follow
the Apache branding guidelines for the logo, and please not manipulate the
vector in any way other than proportionally scaling it as needed.

Minimal version: https://drive.google.com/file/d/0Bxy-
BmVSBalmNFY5YzdnY001ckU/view?usp=sharing
Full version (with "Apache"): https://drive.google.com/file/d/0Bxy-
BmVSBalmdHRUVHgxRHl2ejQ/view?usp=sharing

Hope this helps,
Michael




On Fri, Dec 2, 2016 at 10:04 AM, Jan Filipiak 
wrote:

> Hi,
>
> I was just pointed to this. https://www.vectorlogo.zone/lo
> gos/apache_kafka/
> if someone else is looking for the same thing! thanks a lot
>
> Best Jan
>
>
> On 01.12.2016 13:05, Jan Filipiak wrote:
>
>> Hi Everyone,
>>
>> we want to print some big banners of the Kafka logo to decorate our
>> offices. Can anyone help me find a version
>> of the kafka logo that would still look nice printed onto 2x4m flags?
>> Highly appreciated!
>>
>> Best Jan
>>
>
>


Re: I need some help with the production server architecture

2016-12-01 Thread Michael Noll
+1 to what Dave said.



On Thu, Dec 1, 2016 at 4:29 PM, Tauzell, Dave 
wrote:

> For low volume zookeeper doesn't seem to use many resources.   I would put
> it on nodejs server as that will have less IO and heavy IO could impact
> zookeeper.  Or, you could put some ZK nodes on nodejs and some on DB
> servers to hedge your bets.   As always, you'll find out a lot once you
> actually start running it in production.
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 6:03 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> Folks any help on this.
>
> Just to put it in simple terms, since we have limited resources available
> to us what is better option 1. run zookeeper on servers running the nodejs
> web server or db server.
> 2. what about kafka brokers.
>
> Thanks
> Sachin
>
>
> On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Sometime back i was informed on the group that in production we should
> > never run kafka on same physical machine. So based on that I have a
> > question on how to divide the server nodes we have to run zookeper and
> > kafka brokers.
> >
> > I have a following setup
> > Data center 1
> > Lan 1 (3 VMs)
> > 192.168.xx.yy1
> > 192.168.xx.yy2
> > 192.168.xx.yy3
> > Right now here we are running a cluster of 3 nodejs web servers.
> > These collect data from web and write to kafka queue. Each VM has 70
> > GB of space.
> >
> > Lan 2 (3 VMs)
> > 192.168.zz.aa1
> > 192.168.zz.aa2
> > 192.168.zz.aa3
> > These are served the cluster of our database server. Each VM has 400
> > GB of space.
> >
> > Date center 2
> > Lan 1 (3 VMs)
> > 192.168.yy.bb1
> > 192.168.yy.bb2
> > 192.168.yy.bb3
> > Three new machines where we plan to run a cluster of new database to
> > be served as sink of kafka stream applications. Each VM has 400 GB of
> space.
> > These have connectivity only between Lan 2 of Data center 1 with a
> > 100MBs of data transfer rate.
> >
> > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> >
> > Now I would like my topics to be replicated with a factor of 3. Since
> > we don't foresee much volume of data, I don't want it to be partitioned.
> >
> > Also we would like one server to be used as streaming application
> > server, where we can run one or more kafka stream applications to
> > process the topics and write to the new database.
> >
> >  So please let me know what is a suitable division to run brokers and
> > zookeeper.
> >
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: Interactive Queries

2016-11-28 Thread Michael Noll
There are also some examples/demo applications at
https://github.com/confluentinc/examples that demonstrate the use of
interactive queries:

-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java

Note: The `3.1.x` branch is for Kafka 0.10.1.

-Michael




On Sun, Nov 27, 2016 at 3:35 AM, David Garcia  wrote:

> I would start here: http://docs.confluent.io/3.1.0/streams/index.html
>
>
> On 11/26/16, 8:27 PM, "Alan Kash"  wrote:
>
> Hi,
>
> New to Kafka land.
>
> I am looking into Interactive queries feature, which transforms Topics
> into
> Tables with history, neat !
>
> 1. What kind of queries we can run on the store ?  Point or Range ?
> 2. Is Indexing supported ? primary or seconday ?
> 3. Query language - SQL ? Custom Java Native Query ?
>
> I see rocksdb is the persistent layer.
>
> Did the team look at JCache API (JSR 107) -
> https://jcp.org/en/jsr/detail?id=107 ?
>
> Thanks,
> Alan
>
>
>


Re: Data (re)processing with Kafka (new wiki page)

2016-11-25 Thread Michael Noll
Thanks a lot, Matthias!

I have already begun to provide feedback.

-Michael



On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
wrote:

> Hi,
>
> we added a new wiki page that is supposed to collect data (re)processing
> scenario with Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Data+(Re)
> Processing+Scenarios
>
> We added already a couple of scenarios we think might be common and want
> to invite all of you to add more. This helps to get a better overview of
> requirements to enable new use cases.
>
> We are looking forward to your feedback!
>
>
> -Matthias
>
>


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-25 Thread Michael Noll
; > > > My point is that it would help me (and maybe others), if the API of
> > > KTable
> > > > was extended to have a new method that does two things that is not
> part
> > > of
> > > > the implementation of .through(). 1) Create a state store AND the
> > > changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > > >
> > > > What do you think, would it be possible to extend the API with a
> method
> > > > like that?
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll 
> > > wrote:
> > > >
> > > >> Mikael,
> > > >>
> > > >> regarding your second question:
> > > >>
> > > >>> 2) Regarding the use case, the topology looks like this:
> > > >>>
> > > >>> .stream(...)
> > > >>> .aggregate(..., "store-1")
> > > >>> .mapValues(...)
> > > >>> .through(..., "store-2")
> > > >>
> > > >> The last operator above would, without "..." ellipsis, be sth like
> > > >> `KTable#through("through-topic", "store-2")`.  Here,
> "through-topic"
> > is
> > > the
> > > >> changelog topic for both the KTable and the state store "store-2".
> So
> > > this
> > > >> is the changelog topic name that you want to know.
> > > >>
> > > >> - If you want the "through" topic to have a `-changelog` suffix,
> then
> > > you'd
> > > >> need to add that yourself in the call to `through(...)`.
> > > >>
> > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > >> automatically:  That's because `through()` -- like `to()` or
> > `stream()`,
> > > >> `table()` -- require you to explicitly provide a topic name, and of
> > > course
> > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > > only
> > > >> added when Kafka creates internal changelog topics behind the scenes
> > for
> > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> incorrect
> > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> above.
> > > >>
> > > >> - Also, in case you want to do some shenanigans (like for some
> tooling
> > > >> you're building around state stores/changelogs/interactive queries)
> > such
> > > >> detecting all state store changelogs by doing the equivalent of `ls
> > > >> *-changelog`, then this will miss changelogs of KTables that are
> > > created by
> > > >> `through()` and `to()` (unless you come up with a naming convention
> > that
> > > >> your tooling can assume to be in place, e.g. by always adding
> > > `-changelog`
> > > >> to topic names when you call `through()`).
> > > >>
> > > >> I hope this helps!
> > > >> Michael
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> hoegqv...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Eno,
> > > >>>
> > > >>> 1) Great :)
> > > >>>
> > > >>> 2) Yes, we are using the Interactive Queries to access the state
> > > stores.
> > > >> In
> > > >>> addition, we access the changelogs to subscribe to updates. For
> this
> > > >> reason
> > > >>> we need to know the changelog topic name.
> > > >>>
> > > >>> Thanks,
> > > >>> Mikael
> > > >>>
> > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> eno.there...@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> HI Mikael,
> > > >>>>
> > > >>>> 1) The JavaDoc looks inc

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
> - Also, in case you want to do some shenanigans (like for some tooling
you're building
> around state stores/changelogs/interactive queries) such detecting all
state store changelogs
> by doing the equivalent of `ls *-changelog`, then this will miss
changelogs of KTables that are
> created by `through()` and `to()` [...]

Addendum: And that's because the topic that is created by
`KTable#through()` and `KTable#to()` is, by definition, a changelog of that
KTable and the associated state store.



On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll  wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the changelog topic for both the KTable and the state store "store-2".  So
> this is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
> wrote:
>
>> Hi Eno,
>>
>> 1) Great :)
>>
>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>> we need to know the changelog topic name.
>>
>> Thanks,
>> Mikael
>>
>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
>> wrote:
>>
>> > HI Mikael,
>> >
>> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>> > into fixing it. I agree that it can be confusing to have topic names
>> that
>> > are not what one would expect.
>> >
>> > 2) If your goal is to query/read from the state stores, you can use
>> > Interactive Queries to do that (you don't need to worry about the
>> changelog
>> > topic name and such). Interactive Queries is a new feature in 0.10.1
>> (blog
>> > here:
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > <
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > >).
>> >
>> > Thanks
>> > Eno
>> >
>> >
>> > > On 22 Nov 2016, at 19:27, Mikael Högqvist 
>> wrote:
>> > >
>> > > Sorry for being unclear, i'll try again :)
>> > >
>> > > 1) The JavaDoc for through is not correct, it states that a changelog
>> > topic
>> > > will be created for the state store. That is, if I would call it with
>> > > through("topic", "a-store"), I would expect a kafka topic
>> > > "my-app-id-a-store-changelog" to be created.
>> > >
>> > > 2) Regarding the use case, the topology looks like this:
>> > >
>> > > .stream(...)
>> > > .aggregate(..., "store-1")
>> > > .mapValues(...)
>> > > .through(..., "store-2")
>> > >
>> > > Basically, I want to materialize both the result from the aggregate
>> > method
>> > > and the result from mapValues, 

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
Mikael,

regarding your second question:

> 2) Regarding the use case, the topology looks like this:
>
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")

The last operator above would, without "..." ellipsis, be sth like
`KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
changelog topic for both the KTable and the state store "store-2".  So this
is the changelog topic name that you want to know.

- If you want the "through" topic to have a `-changelog` suffix, then you'd
need to add that yourself in the call to `through(...)`.

- If you wonder why `through()` doesn't add a `-changelog` suffix
automatically:  That's because `through()` -- like `to()` or `stream()`,
`table()` -- require you to explicitly provide a topic name, and of course
Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
added when Kafka creates internal changelog topics behind the scenes for
you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
because it refers to `-changelog`;  we'll fix that as mentioned above.

- Also, in case you want to do some shenanigans (like for some tooling
you're building around state stores/changelogs/interactive queries) such
detecting all state store changelogs by doing the equivalent of `ls
*-changelog`, then this will miss changelogs of KTables that are created by
`through()` and `to()` (unless you come up with a naming convention that
your tooling can assume to be in place, e.g. by always adding `-changelog`
to topic names when you call `through()`).

I hope this helps!
Michael




On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
wrote:

> Hi Eno,
>
> 1) Great :)
>
> 2) Yes, we are using the Interactive Queries to access the state stores. In
> addition, we access the changelogs to subscribe to updates. For this reason
> we need to know the changelog topic name.
>
> Thanks,
> Mikael
>
> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
> wrote:
>
> > HI Mikael,
> >
> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> > into fixing it. I agree that it can be confusing to have topic names that
> > are not what one would expect.
> >
> > 2) If your goal is to query/read from the state stores, you can use
> > Interactive Queries to do that (you don't need to worry about the
> changelog
> > topic name and such). Interactive Queries is a new feature in 0.10.1
> (blog
> > here:
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > <
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > >).
> >
> > Thanks
> > Eno
> >
> >
> > > On 22 Nov 2016, at 19:27, Mikael Högqvist  wrote:
> > >
> > > Sorry for being unclear, i'll try again :)
> > >
> > > 1) The JavaDoc for through is not correct, it states that a changelog
> > topic
> > > will be created for the state store. That is, if I would call it with
> > > through("topic", "a-store"), I would expect a kafka topic
> > > "my-app-id-a-store-changelog" to be created.
> > >
> > > 2) Regarding the use case, the topology looks like this:
> > >
> > > .stream(...)
> > > .aggregate(..., "store-1")
> > > .mapValues(...)
> > > .through(..., "store-2")
> > >
> > > Basically, I want to materialize both the result from the aggregate
> > method
> > > and the result from mapValues, which is materialized using .through().
> > > Later, I will access both the tables (store-1 and store-2) to a) get
> the
> > > current state of the aggregate, b) subscribe to future updates. This
> > works
> > > just fine. The only issue is that I assumed to have a changelog topic
> for
> > > store-2 created automatically, which didnt happen.
> > >
> > > Since I want to access the changelog topic, it helps if the naming is
> > > consistent. So either we enforce the same naming pattern as kafka when
> > > calling .through() or alternatively the Kafka Streams API can provide a
> > > method to materialize tables which creates a topic name according to
> the
> > > naming pattern. E.g. .through() without the topic parameter.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> I cannot completely follow what want to achieve.
> > >>
> > >> However, the JavaDoc for through() seems not to be correct to me.
> Using
> > >> through() will not create an extra internal changelog topic with the
> > >> described naming schema, because the topic specified in through() can
> be
> > >> used for this (there is no point in duplicating the data).
> > >>
> > >> If you have a KTable and apply a mapValues(), this will not write data
> > >> to any topic. The derived KTable is in-memory because you can easily
> > >> recreate it from its base KTable.
> > >>
> > >> What is the missing part you want to get?
> > >>
> > >> Btw: the internally created changelog topics are only used for
> recovery
> > >> in case

Re: Kafka Streaming message loss

2016-11-21 Thread Michael Noll
Also:  Since your testing is purely local, feel free to share the code you
have been using so that we can try to reproduce what you're observing.

-Michael



On Mon, Nov 21, 2016 at 4:04 PM, Michael Noll  wrote:

> Please don't take this comment the wrong way, but have you double-checked
> whether your counting code is working correctly?  (I'm not implying this
> could be the only reason for what you're observing.)
>
> -Michael
>
>
> On Fri, Nov 18, 2016 at 4:52 PM, Eno Thereska 
> wrote:
>
>> Hi Ryan,
>>
>> Perhaps you could share some of your code so we can have a look? One
>> thing I'd check is if you are using compacted Kafka topics. If so, and if
>> you have non-unique keys, compaction happens automatically and you might
>> only see the latest value for a key.
>>
>> Thanks
>> Eno
>> > On 18 Nov 2016, at 13:49, Ryan Slade  wrote:
>> >
>> > Hi
>> >
>> > I'm trialling Kafka Streaming for a large stream processing job, however
>> > I'm seeing message loss even in the simplest scenarios.
>> >
>> > I've tried to boil it down to the simplest scenario where I see loss
>> which
>> > is the following:
>> > 1. Ingest messages from an input stream (String, String)
>> > 2. Decode message into a type from JSON
>> > 3. If succesful, send to a second stream and update an atomic counter.
>> > (String, CustomType)
>> > 4. A foreach on the second stream that updates an AtomicCounter each
>> time a
>> > message arrives.
>> >
>> > I would expect that since we have at least once guarantees that the
>> second
>> > stream would see at least as many messages as were sent to it from the
>> > first, however I consistently see message loss.
>> >
>> > I've tested multiple times sending around 200k messages. I don't see
>> losses
>> > every time, maybe around 1 in 5 runs with the same data. The losses are
>> > small, around 100 messages, but I would expect none.
>> >
>> > I'm running version 0.10.1.0 with Zookeeper, Kafka and the Stream
>> Consumer
>> > all running on the same machine in order to mitigate packet loss.
>> >
>> > I'm running Ubuntu 16.04 with OpenJDK.
>> >
>> > Any advice would be greatly appreciated as I can't move forward with
>> Kafka
>> > Streams as a solution if messages are consistently lost between stream
>> on
>> > the same machine.
>> >
>> > Thanks
>> > Ryan
>>
>>
>


Re: Kafka Streaming message loss

2016-11-21 Thread Michael Noll
Please don't take this comment the wrong way, but have you double-checked
whether your counting code is working correctly?  (I'm not implying this
could be the only reason for what you're observing.)

-Michael


On Fri, Nov 18, 2016 at 4:52 PM, Eno Thereska 
wrote:

> Hi Ryan,
>
> Perhaps you could share some of your code so we can have a look? One thing
> I'd check is if you are using compacted Kafka topics. If so, and if you
> have non-unique keys, compaction happens automatically and you might only
> see the latest value for a key.
>
> Thanks
> Eno
> > On 18 Nov 2016, at 13:49, Ryan Slade  wrote:
> >
> > Hi
> >
> > I'm trialling Kafka Streaming for a large stream processing job, however
> > I'm seeing message loss even in the simplest scenarios.
> >
> > I've tried to boil it down to the simplest scenario where I see loss
> which
> > is the following:
> > 1. Ingest messages from an input stream (String, String)
> > 2. Decode message into a type from JSON
> > 3. If succesful, send to a second stream and update an atomic counter.
> > (String, CustomType)
> > 4. A foreach on the second stream that updates an AtomicCounter each
> time a
> > message arrives.
> >
> > I would expect that since we have at least once guarantees that the
> second
> > stream would see at least as many messages as were sent to it from the
> > first, however I consistently see message loss.
> >
> > I've tested multiple times sending around 200k messages. I don't see
> losses
> > every time, maybe around 1 in 5 runs with the same data. The losses are
> > small, around 100 messages, but I would expect none.
> >
> > I'm running version 0.10.1.0 with Zookeeper, Kafka and the Stream
> Consumer
> > all running on the same machine in order to mitigate packet loss.
> >
> > I'm running Ubuntu 16.04 with OpenJDK.
> >
> > Any advice would be greatly appreciated as I can't move forward with
> Kafka
> > Streams as a solution if messages are consistently lost between stream on
> > the same machine.
> >
> > Thanks
> > Ryan
>
>


Re: Kafka windowed table not aggregating correctly

2016-11-21 Thread Michael Noll
On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal  wrote:

> I am using kafka_2.10-0.10.0.1.
> Say I am having a window of 60 minutes advanced by 15 minutes.
> If the stream app using timestamp extractor puts the message in one or more
> bucket(s), it will get aggregated in those buckets.
> I assume this statement is correct.
>

Yes.



>
> Also say when I restart the streams application then bucket aggregation
> will resume from last point of halt.
> I hope this is also correct.
>

Yes.



> What I noticed that once a message is placed in one bucket, that bucket was
> not getting new messages.
>

This should not happen...


> However when I ran a small test case replicating that, it is working
> properly. There maybe some issues in application reset.
>

...and apparently it works (as expected) in your small test case.

Do you have any further information that you could share with us so we can
help you better?  What's the difference, for example, between your "normal"
use case and the small test case you have been referring to?


-Michael


Re: Kafka Streams internal topic naming

2016-11-18 Thread Michael Noll
Srikanth,

as Matthias said, you can achieve some namespacing effects through the use
of (your own in-house) conventions of defining `application.id` across
teams.  The id is used as the prefix for topics, see
http://docs.confluent.io/current/streams/developer-guide.html#required-configuration-parameters
for further details.

Best,
Michael



On Thu, Nov 17, 2016 at 6:30 PM, Matthias J. Sax 
wrote:

> The only way to influence the naming is via application.id which you can
> set as you wish. Hope this is good enough to meet your naming conventions.
>
> As Michael mentioned, there is no way to manually specify internal topic
> names right now.
>
> -Matthias
>
> On 11/17/16 8:45 AM, Srikanth wrote:
> > That is right Michael. Most teams that use kafka library can adhere to
> > certain naming convention.
> > Using streams API will break that.
> >
> > Srikanth
> >
> > On Wed, Nov 16, 2016 at 2:32 PM, Michael Noll 
> wrote:
> >
> >> Srikanth,
> >>
> >> no, there's isn't any API to control the naming of internal topics.
> >>
> >> Is the reason you're asking for such functionality only/mostly about
> >> multi-tenancy issues (as you mentioned in your first message)?
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Wed, Nov 16, 2016 at 8:20 PM, Srikanth 
> wrote:
> >>
> >>> Hello,
> >>>
> >>> Does kafka stream provide an API to control how internal topics are
> >> named?
> >>> Right now it uses appId, operator name, etc.
> >>> In a shared kafka cluster its common to have naming convention that may
> >>> require some prefix/suffix.
> >>>
> >>> Srikanth
> >>>
> >>
> >
>
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>


Re: Kafka Streams internal topic naming

2016-11-16 Thread Michael Noll
Srikanth,

no, there's isn't any API to control the naming of internal topics.

Is the reason you're asking for such functionality only/mostly about
multi-tenancy issues (as you mentioned in your first message)?

-Michael



On Wed, Nov 16, 2016 at 8:20 PM, Srikanth  wrote:

> Hello,
>
> Does kafka stream provide an API to control how internal topics are named?
> Right now it uses appId, operator name, etc.
> In a shared kafka cluster its common to have naming convention that may
> require some prefix/suffix.
>
> Srikanth
>


Re: Process KTable on Predicate

2016-11-15 Thread Michael Noll
Nick,

if I understand you correctly you can already do this today:

Think: KTable.toStream().filter().foreach() (or just
KTable.filter().foreach(), depending on what you are aiming to do)

Would that work for you?



On Sun, Nov 13, 2016 at 12:12 AM, Nick DeCoursin 
wrote:

> Feature proposal:
>
> KTable when(predicate);
>
> I have a KTable, but I'd only like to trigger a stream processor on certain
> conditions. I'd like to do something like:
>
> myKtable.when(
>  (key, value) -> some predicate
> );
>
> The result is just the same KTable. The predicate function is called on any
> new event.
>
> Thank you,
> Nick DeCoursin
>


Re: Is it possible to resubcribe KafkaStreams in runtime to different set of topics?

2016-11-09 Thread Michael Noll
I am not aware of any short-term plans to support that, but perhaps others
in the community / mailing list are.

On Wed, Nov 9, 2016 at 11:15 AM, Timur Yusupov  wrote:

> Are there any nearest plans to support that?
>
> On Wed, Nov 9, 2016 at 1:11 PM, Michael Noll  wrote:
>
> > This is not possible at the moment.  However, depending on your use case,
> > you might be able to leverage regex topic subscriptions (think: "b*" to
> > read from all topics starting with letter `b`).
> >
> > On Wed, Nov 9, 2016 at 10:56 AM, Timur Yusupov 
> > wrote:
> >
> > > Hello,
> > >
> > > In our system it is possible to add/remove topics in runtime and we are
> > > trying to use KafkaStreams for incoming messages processing.
> > >
> > > It is possible to resubscribe KafkaStreams instance to updated set of
> > > topics?
> > >
> > > For now I see the only way is to shutdown exiting KafkaStreams instance
> > and
> > > start a new one, but sometimes that takes up to 30-40 seconds...
> > >
> > > Thanks, Timur.
> > >
> >
>


Re: Is it possible to resubcribe KafkaStreams in runtime to different set of topics?

2016-11-09 Thread Michael Noll
This is not possible at the moment.  However, depending on your use case,
you might be able to leverage regex topic subscriptions (think: "b*" to
read from all topics starting with letter `b`).

On Wed, Nov 9, 2016 at 10:56 AM, Timur Yusupov  wrote:

> Hello,
>
> In our system it is possible to add/remove topics in runtime and we are
> trying to use KafkaStreams for incoming messages processing.
>
> It is possible to resubscribe KafkaStreams instance to updated set of
> topics?
>
> For now I see the only way is to shutdown exiting KafkaStreams instance and
> start a new one, but sometimes that takes up to 30-40 seconds...
>
> Thanks, Timur.
>


Re: Kafka Streaming

2016-10-20 Thread Michael Noll
I suspect you are running Kafka 0.10.0.x on Windows?  If so, this is a
known issue that is fixed in Kafka 0.10.1 that was just released today.

Also: which examples are you referring to?  And, to confirm: which git
branch / Kafka version / OS in case my guess above was wrong.


On Thursday, October 20, 2016, Mohit Anchlia  wrote:

> I am trying to run the examples from git. While running the wordcount
> example I see this error:
>
> Caused by: *java.lang.RuntimeException*: librocksdbjni-win64.dll was not
> found inside JAR.
>
>
> Am I expected to include this jar locally?
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno 
Follow us: Twitter  | Blog



Re: Dismissing late messages in Kafka Streams

2016-10-20 Thread Michael Noll
Nicolas,

> I set the maintain duration of the window to 30 days.
> If it consumes a message older than 30 days, then a new aggregate is created
for this old window.

I assume you mean:  If a message should have been included in the original
("old") window but that message happens to arrive late (after the
"original" 30 days), then a new aggregate is created for this old window?
I wanted to ask this first because answering your questions depends on what
exactly you mean here.


> The problem is that this old windowed aggregate is of course incomplete
and
> will overwrite a record in the final database.

Not sure I understand -- why would the old windowed aggregate be
incomplete?  Could you explain a bit more what you mean?


> By the way, is there any article about replaying old messages. Some tips
> and tricks, like "you'd better do that in another deployment of your
> topology", and/or "you'd better use topics dedicated to repair".

I am not aware of a deep dive article or docs on that just yet.  There's a
first blog post [1] about Kakfa's new Application Reset Tool that goes into
this direction, but this is only a first step into the direction of
replaying/reprocessing of old messages.  Do you have specific questions
here that we can help you with in the meantime?

[1]
http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application






On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché  wrote:

> Hi,
>
> I aggregate some data with `aggregateByKey` and a `TimeWindows`.
>
> I set the maintain duration of the window to 30 days.
> If it consumes a message older than 30 days, then a new aggregate is
> created for this old window.
> The problem is that this old windowed aggregate is of course incomplete and
> will overwrite a record in the final database.
>
> So is there a way to dismiss these old messages ?
>
> I only see the point of accepting old messages when the topology is
> launched in "repair" mode.
> By the way, is there any article about replaying old messages. Some tips
> and tricks, like "you'd better do that in another deployment of your
> topology", and/or "you'd better use topics dedicated to repair".
>
> Thanks
> Nicolas
>


Re: How to block tests of Kafka Streams until messages processed?

2016-10-20 Thread Michael Noll
> Would there be any advantage to using the kafka connect method?

The advantage is to decouple the data processing (which you do in your app)
from the responsibility of making the processing results available to one
or more downstream systems, like Cassandra.

For example, what will your application (that uses Kafka Streams) do if
Cassandra is unavailable or slow?  Will you retry, and if so -- for how
long?  Retrying writes to external systems means that the time spent doing
this will not be spent on processing the next input records, thus
increasing the latency of your processing topology.  At this point you have
coupled your app to the availability of both Kafka and Cassandra.
Upgrading or doing maintenance on Cassandra will now also mean there's
potential impact on your app.



On Thu, Oct 20, 2016 at 9:39 AM, Ali Akhtar  wrote:

> Michael,
>
> Would there be any advantage to using the kafka connect method? Seems like
> it'd just add an extra step of overhead?
>
> On Thu, Oct 20, 2016 at 12:35 PM, Michael Noll 
> wrote:
>
> > Ali,
> >
> > my main feedback is similar to what Eno and Dave have already said.  In
> > your situation, options like these are what you'd currently need to do
> > since you are writing directly from your Kafka Stream app to Cassandra,
> > rather than writing from your app to Kafka and then using Kafka Connect
> to
> > ingest into Cassandra.
> >
> >
> >
> > On Wed, Oct 19, 2016 at 11:03 PM, Ali Akhtar 
> wrote:
> >
> > > Yeah, I did think to use that method, but as you said, it writes to a
> > dummy
> > > output topic, which means I'd have to put in magic code just for the
> > tests
> > > to pass (the actual code writes to cassandra and not to a dummy topic).
> > >
> > >
> > > On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <
> > > dave.tauz...@surescripts.com
> > > > wrote:
> > >
> > > > For similar queue related tests we put the check in a loop.  Check
> > every
> > > > second until either the result is found or a timeout  happens.
> > > >
> > > > -Dave
> > > >
> > > > -Original Message-
> > > > From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> > > > Sent: Wednesday, October 19, 2016 3:38 PM
> > > > To: users@kafka.apache.org
> > > > Subject: How to block tests of Kafka Streams until messages
> processed?
> > > >
> > > > I'm using Kafka Streams, and I'm attempting to write integration
> tests
> > > for
> > > > a stream processor.
> > > >
> > > > The processor listens to a topic, processes incoming messages, and
> > writes
> > > > some data to Cassandra tables.
> > > >
> > > > I'm attempting to write a test which produces some test data, and
> then
> > > > checks whether or not the expected data was written to Cassandra.
> > > >
> > > > It looks like this:
> > > >
> > > > - Step 1: Produce data in the test
> > > > - Step 2: Kafka stream gets triggered
> > > > - Step 3: Test checks whether cassandra got populated
> > > >
> > > > The problem is, Step 3 is occurring before Step 2, and as a result,
> the
> > > > test fails as it doesn't find the data in the table.
> > > >
> > > > I've resolved this by adding a Thread.sleep(2000) call after Step 1,
> > > which
> > > > ensures that Step 2 gets triggered before Step 3.
> > > >
> > > > However, I'm wondering if there's a more reliable way of blocking the
> > > test
> > > > until Kafka stream processor gets triggered?
> > > >
> > > > At the moment, I'm using 1 thread for the processor. If I increase
> that
> > > to
> > > > 2 threads, will that achieve what I want?
> > > > This e-mail and any files transmitted with it are confidential, may
> > > > contain sensitive information, and are intended solely for the use of
> > the
> > > > individual or entity to whom they are addressed. If you have received
> > > this
> > > > e-mail in error, please notify the sender by reply e-mail immediately
> > and
> > > > destroy all copies of the e-mail and any attachments.
> > > >
> > >
> >
>


Re: How to block tests of Kafka Streams until messages processed?

2016-10-20 Thread Michael Noll
Ali,

my main feedback is similar to what Eno and Dave have already said.  In
your situation, options like these are what you'd currently need to do
since you are writing directly from your Kafka Stream app to Cassandra,
rather than writing from your app to Kafka and then using Kafka Connect to
ingest into Cassandra.



On Wed, Oct 19, 2016 at 11:03 PM, Ali Akhtar  wrote:

> Yeah, I did think to use that method, but as you said, it writes to a dummy
> output topic, which means I'd have to put in magic code just for the tests
> to pass (the actual code writes to cassandra and not to a dummy topic).
>
>
> On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <
> dave.tauz...@surescripts.com
> > wrote:
>
> > For similar queue related tests we put the check in a loop.  Check every
> > second until either the result is found or a timeout  happens.
> >
> > -Dave
> >
> > -Original Message-
> > From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> > Sent: Wednesday, October 19, 2016 3:38 PM
> > To: users@kafka.apache.org
> > Subject: How to block tests of Kafka Streams until messages processed?
> >
> > I'm using Kafka Streams, and I'm attempting to write integration tests
> for
> > a stream processor.
> >
> > The processor listens to a topic, processes incoming messages, and writes
> > some data to Cassandra tables.
> >
> > I'm attempting to write a test which produces some test data, and then
> > checks whether or not the expected data was written to Cassandra.
> >
> > It looks like this:
> >
> > - Step 1: Produce data in the test
> > - Step 2: Kafka stream gets triggered
> > - Step 3: Test checks whether cassandra got populated
> >
> > The problem is, Step 3 is occurring before Step 2, and as a result, the
> > test fails as it doesn't find the data in the table.
> >
> > I've resolved this by adding a Thread.sleep(2000) call after Step 1,
> which
> > ensures that Step 2 gets triggered before Step 3.
> >
> > However, I'm wondering if there's a more reliable way of blocking the
> test
> > until Kafka stream processor gets triggered?
> >
> > At the moment, I'm using 1 thread for the processor. If I increase that
> to
> > 2 threads, will that achieve what I want?
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of the
> > individual or entity to whom they are addressed. If you have received
> this
> > e-mail in error, please notify the sender by reply e-mail immediately and
> > destroy all copies of the e-mail and any attachments.
> >
>


Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-20 Thread Michael Noll
Absolutely, Sai.  That's exactly why we want to improve the
upgrade/compatibility story.


On Thu, Oct 20, 2016 at 12:28 AM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> Thanks Michael
> Hopefully the upgrade story evolves as 0.10.1+ advances to maturity
>
> Just my 2 cents
>
> Decoupling the kafka streams from the core kafka changes will help so that
> the broker can be upgraded without notice and streaming apps can evolve to
> newer streaming features on their own pace
>
> Regards
> Sai
>
>
> On Wednesday, October 19, 2016, Michael Noll  wrote:
>
> > Apps built with Kafka Streams 0.10.1 only work against Kafka clusters
> > running 0.10.1+.  This explains your error message above.
> >
> > Unfortunately, Kafka's current upgrade story means you need to upgrade
> your
> > cluster in this situation.  Moving forward, we're planning to improve the
> > upgrade/compatibility story of Kafka so that you could, for example, run
> a
> > newer version of Kafka Streams (or any other Kafka client) against an
> older
> > version of Kafka.
> >
> >
> >
> > On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra <
> > saiprasadmis...@gmail.com> wrote:
> >
> > > Hi All
> > >
> > > Was testing with 0.10.1.0 rc3 build for my new streams app
> > >
> > > Seeing issues starting my kafk streams app( 0.10.1.0) on the old
> version
> > > broker 0.10.0.1. I dont know if it is supposed to work as is. Will
> > upgrade
> > > the broker to same version and see whether it goes away
> > >
> > > client side issues
> > >
> > > ==
> > >
> > > java.io.EOFException
> > >
> > > at
> > > org.apache.kafka.common.network.NetworkReceive.
> readFromReadableChannel(
> > > NetworkReceive.java:83)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.NetworkReceive.
> > > readFrom(NetworkReceive.java:71)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.KafkaChannel.receive(
> > > KafkaChannel.java:154)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.common.network.KafkaChannel.read(
> > > KafkaChannel.java:135)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> > > java:343)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > > ConsumerNetworkClient.java:232)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > > ConsumerNetworkClient.java:209)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > > awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > > awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureCoordinatorReady(AbstractCoordinator.java:197)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:248)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > Stre

Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-19 Thread Michael Noll
Apps built with Kafka Streams 0.10.1 only work against Kafka clusters
running 0.10.1+.  This explains your error message above.

Unfortunately, Kafka's current upgrade story means you need to upgrade your
cluster in this situation.  Moving forward, we're planning to improve the
upgrade/compatibility story of Kafka so that you could, for example, run a
newer version of Kafka Streams (or any other Kafka client) against an older
version of Kafka.



On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> Hi All
>
> Was testing with 0.10.1.0 rc3 build for my new streams app
>
> Seeing issues starting my kafk streams app( 0.10.1.0) on the old version
> broker 0.10.0.1. I dont know if it is supposed to work as is. Will upgrade
> the broker to same version and see whether it goes away
>
> client side issues
>
> ==
>
> java.io.EOFException
>
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
> NetworkReceive.java:83)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.NetworkReceive.
> readFrom(NetworkReceive.java:71)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.KafkaChannel.receive(
> KafkaChannel.java:154)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.common.network.KafkaChannel.read(
> KafkaChannel.java:135)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> java:343)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:232)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:209)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureCoordinatorReady(AbstractCoordinator.java:197)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:248)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
>
>
>
> On the broker side the following message appears
>
> =
>
> kafka.network.InvalidRequestException: Error getting request for apiKey: 3
> and apiVersion: 2
>
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(
> RequestChannel.scala:95)
>
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
>
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:488)
>
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:483)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at kafka.network.Processor.processCompletedReceives(
> SocketServer.scala:483)
>
> at kafka.network.Processor.run(SocketServer.scala:413)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Invalid version for API key
> 3: 2
>
> at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(
> ProtoUtils.java:31)
>
> at
> org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:
> 44)
>
> at
> org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:
> 60)
>
> at
> org.apache.kafka.common.requests.MetadataRequest.
> parse(MetadataRequest.java:96)
>
> at
> org.apache.kafka.common.requests.AbstractRequest.
> getRequest(AbstractRequest.java:48)
>
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(
> RequestChannel.scala:92)
>
> Regards
>
> Sai
>


Re: How to detect an old consumer

2016-10-17 Thread Michael Noll
Old consumers use ZK to store their offsets.  Could you leverage the
timetamps of the corresponding znodes [1] for this?


[1]
https://zookeeper.apache.org/doc/r3.4.5/zookeeperProgrammers.html#sc_zkDataModel_znodes


On Mon, Oct 17, 2016 at 4:45 PM, Fernando Bugni 
wrote:

> Hello,
>
> I want to detect old consumers in my kafka servers. Is there any tool to
> see the last date when they connected? I tried in kafka-manager but I only
> have the consumer group and its offset, which is not useful to detect that
> problem...
>
> Thanks in advance!
> --
> Fernando Bugni
>


Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Michael Noll
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.

Yes -- if your processing logic depends on the specific ordering of
messages (which is the case for you), then you must manually implement this
ordering-specific logic at the moment.

Other use cases may not need to do that and "just work" even with
out-of-order data.  If, for example, you are counting objects or are
computing the sum of numbers, then you do not need to anything special.





On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar  wrote:

> Thanks Matthias.
>
> So, if I'm understanding this right, Kafka will not discard which messages
> which arrive out of order.
>
> What it will do is show messages in the order in which they arrive.
>
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.
>
> Is that correct?
>
> Thanks.
>
> On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Last question first: A KTable is basically in finite window over the
> > whole stream providing a single result (that gets updated when new
> > data arrives). If you use windows, you cut the overall stream into
> > finite subsets and get a result per window. Thus, I guess you do not
> > need windows (if I understood you use case correctly).
> >
> > However, current state of Kafka Streams DSL, you will not be able to
> > use KTable (directly -- see suggestion to fix this below) because is
> > does (currently) not allow to access the timestamp of the current
> > record (thus, you can not know if a record is late or not). You will
> > need to use Processor API which allows you to access the current
> > records timestamp via the Context object given in init()
> >
> > Your reasoning about partitions and Streams instances is correct.
> > However, the following two are not
> >
> > > - Because I'm using a KTable, the timestamp of the messages is
> > > extracted, and I'm not shown the older bid because I've already
> > > processed the later bid. The older bid is ignored.
> >
> > and
> >
> > > - Because of this, the replica already knows which timestamps it
> > > has processed, and is able to ignore the older messages.
> >
> > Late arriving records are not dropped but processes regularly. Thus,
> > your KTable aggregate function will be called for the late arriving
> > record, too (but as described about, you have currently no way to know
> > it is a later record).
> >
> >
> > Last but not least, you last statement is a valid concern:
> >
> > > Also, what will happen if bid 2 arrived and got processed, and then
> > > the particular replica crashed, and was restarted. The restarted
> > > replica won't have any memory of which timestamps it has previously
> > > processed.
> > >
> > > So if bid 2 got processed, replica crashed and restarted, and then
> > > bid 1 arrived, what would happen in that case?
> >
> > In order to make this work, you would need to store the timestamp in
> > you store next to the actual data. Thus, you can compare the timestamp
> > of the latest result (safely stored in operator state) with the
> > timestamp of the current record.
> >
> > Does this makes sense?
> >
> > To fix you issue, you could add a .transformValue() before you KTable,
> > which allows you to access the timestamp of a record. If you add this
> > timestamp to you value and pass it to KTable afterwards, you can
> > access it and it gets also store reliably.
> >
> >  => transformValue =>  > timestamp} => aggregate
> >
> > Hope this helps.
> >
> > - -Matthias
> >
> >
> > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > P.S, does my scenario require using windows, or can it be achieved
> > > using just KTable?
> > >
> > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> > > wrote:
> > >
> > >> Heya,
> > >>
> > >> Say I'm building a live auction site, with different products.
> > >> Different users will bid on different products. And each time
> > >> they do, I want to update the product's price, so it should
> > >> always have the latest price in place.
> > >>
> > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > >> the same product 100 ms later.
> > >>
> > >> The second bid arrives first and the price is updated to $5. Then
> > >> the first bid arrives. I want the price to not be updated in this
> > >> case, as this bid is older than the one I've already processed.
> > >>
> > >> Here's my understanding of how I can achieve this with Kafka
> > >> Streaming - is my understanding correct?
> > >>
> > >> - I have a topic for receiving bids. The topic has N partitions,
> > >> and I have N replicas of my application which hooks up w/ Kafka
> > >> Streaming, up and running.
> > >>
> > >> - I assume each replica of my app will listen to a different
> > >> partition of the topic.
> > >>
> > >> - A user makes a bid on product A.
> > >>
> > >> - This is pushed to the topic with the key bid_a
> > >>
> > >> - Another user makes a bid

Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-12 Thread Michael Noll
t;streams.close();
> >>
> >>  }
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> *SERDE*
> >>
> >>
> >> public class KafkaPayloadSerdes {
> >>
> >> static private class WrapperSerde implements
> >> Serde {
> >> final private Serializer serializer;
> >> final private Deserializer deserializer;
> >>
> >> public WrapperSerde(Serializer serializer,
> >> Deserializer deserializer) {
> >> this.serializer = serializer;
> >> this.deserializer = deserializer;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> serializer.configure(configs, isKey);
> >> deserializer.configure(configs, isKey);
> >> }
> >>
> >> @Override
> >> public void close() {
> >> serializer.close();
> >> deserializer.close();
> >> }
> >>
> >> @Override
> >> public Serializer serializer() {
> >> return serializer;
> >> }
> >>
> >> @Override
> >> public Deserializer deserializer() {
> >> return deserializer;
> >> }
> >> }
> >>
> >> static public final class KafkaPayloadSerde extends
> >> WrapperSerde {
> >> public KafkaPayloadSerde() {
> >> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> >> }
> >> }
> >>
> >> /**
> >> * A serde for nullable < KafkaPayload> type.
> >> */
> >> static public Serde KafkaPayload() {
> >> return new KafkaPayloadSerde();
> >> }
> >>
> >> }
> >>
> >>
> >> *Serilizer/Deserializer*
> >>
> >>
> >>
> >> public class KafkaPayloadSerializer implements Serializer,
> >> Deserializer {
> >>
> >> private static final Logger log = org.apache.logging.log4j.LogManager
> >> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
> >>
> >> @Override
> >> public KafkaPayload deserialize(String topic, byte[] arg1) {
> >> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
> >> ObjectInput in = null;
> >> Object obj = null;
> >> try {
> >> in = new ObjectInputStream(bis);
> >> obj = in.readObject();
> >> } catch (IOException e) {
> >> log.error(e);
> >> } catch (ClassNotFoundException e) {
> >> log.error(e);
> >> } finally {
> >> try {
> >> bis.close();
> >> if (in != null) {
> >> in.close();
> >> }
> >> } catch (IOException ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return (KafkaPayload) obj;
> >> }
> >>
> >> @Override
> >> public void close() {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> @Override
> >> public byte[] serialize(String topic, KafkaPayload kpayload) {
> >> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> >> ObjectOutput out = null;
> >> byte[] payload = null;
> >> try {
> >> out = new ObjectOutputStream(bos);
> >> out.writeObject(kpayload);
> >> payload = bos.toByteArray();
> >>
> >> } catch (IOException e) {
> >> e.printStackTrace();
> >> } finally {
> >> try {
> >> if (out != null) {
> >> out.close();
> >> bos.close();
> >> }
> >> } catch (Exception ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return payload;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >> On 11 October 2016 at 20:13, Michael Noll  wrote:
> >>
> >>> When I wrote:
> >>>
> >>> "If you haven't changed to default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>> it should have read:
> >>>
> >>> "If you haven't changed the default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>>
> >>>
> >>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll 
> >>> wrote:
> >>>
> >>> > Ratha,

Re: Support for Kafka

2016-10-11 Thread Michael Noll
Actually, I wanted to include the following link for the JVM docs (the
information matches what's written in the earlier link I shared):
http://kafka.apache.org/documentation#java


On Tue, Oct 11, 2016 at 11:21 AM, Michael Noll  wrote:

> Regarding the JVM, we recommend running the latest version of JDK 1.8 with
> the G1 garbage collector:
> http://docs.confluent.io/current/kafka/deployment.html#jvm
>
> And yes, Kafka does run on Ubuntu 16.04, too.
>
> (Confluent provides .deb packages [1] for Apache Kafka if you are looking
> for these to install Kafka on Ubuntu.)
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/installation.html
>
>
>
>
> On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:
>
>> Hi Syed,
>>
>> Apache Kafka runs on a JVM. I think the question you should ask is --
>> which
>> JVM does Apache Kafka require in production*? It doesn't really depend on
>> anything on a specific Linux distribution.
>>
>> * ...and I don't have that answer ;-)
>>
>> Cheers,
>> Jens
>>
>> On Wednesday, October 5, 2016, Syed Hussaini <
>> syed.hussa...@theexchangelab.com> wrote:
>>
>> > Dear Kafka team.
>> >
>> > I am in the Implementation stage of Kafka cluster and looking to
>> find
>> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
>> >
>> >
>> >
>> > Would be great if you please let us know.
>> >
>> >
>> >
>> >
>> >
>> > [image: The Exchange Lab] <http://www.theexchangelab.com/>
>> >
>> > *Syed Hussaini*
>> > Infrastructure Engineer
>> >
>> > 1 Neathouse Place
>> > 5th Floor
>> > London, England, SW1V 1LH
>> >
>> >
>> > syed.hussa...@theexchangelab.com
>> > 
>> >
>> > T 0203 701 3177
>> >
>> >
>> > --
>> >
>> > Follow us on Twitter: @exchangelab <https://twitter.com/exchangelab> |
>> Visit
>> > us on LinkedIn: The Exchange Lab
>> > <https://www.linkedin.com/company/the-exchange-lab>
>> >
>> >
>> >
>> >
>> >
>>
>>
>> --
>> Jens Rantil
>> Backend engineer
>> Tink AB
>>
>> Email: jens.ran...@tink.se
>> Phone: +46 708 84 18 32
>> Web: www.tink.se
>>
>> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
>> <http://www.linkedin.com/company/2735919?trk=vsrp_companies_
>> res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVS
>> RPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>>  Twitter <https://twitter.com/tink>
>>
>
>
>
>


Re: Support for Kafka

2016-10-11 Thread Michael Noll
Regarding the JVM, we recommend running the latest version of JDK 1.8 with
the G1 garbage collector:
http://docs.confluent.io/current/kafka/deployment.html#jvm

And yes, Kafka does run on Ubuntu 16.04, too.

(Confluent provides .deb packages [1] for Apache Kafka if you are looking
for these to install Kafka on Ubuntu.)

Hope this helps,
Michael



[1] http://docs.confluent.io/current/installation.html




On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:

> Hi Syed,
>
> Apache Kafka runs on a JVM. I think the question you should ask is -- which
> JVM does Apache Kafka require in production*? It doesn't really depend on
> anything on a specific Linux distribution.
>
> * ...and I don't have that answer ;-)
>
> Cheers,
> Jens
>
> On Wednesday, October 5, 2016, Syed Hussaini <
> syed.hussa...@theexchangelab.com> wrote:
>
> > Dear Kafka team.
> >
> > I am in the Implementation stage of Kafka cluster and looking to find
> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
> >
> >
> >
> > Would be great if you please let us know.
> >
> >
> >
> >
> >
> > [image: The Exchange Lab] 
> >
> > *Syed Hussaini*
> > Infrastructure Engineer
> >
> > 1 Neathouse Place
> > 5th Floor
> > London, England, SW1V 1LH
> >
> >
> > syed.hussa...@theexchangelab.com
> > 
> >
> > T 0203 701 3177
> >
> >
> > --
> >
> > Follow us on Twitter: @exchangelab  |
> Visit
> > us on LinkedIn: The Exchange Lab
> > 
> >
> >
> >
> >
> >
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
>  companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%
> 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>  Twitter 
>


Re: puncutuate() never called

2016-10-11 Thread Michael Noll
Thanks for the follow-up and the bug report, David.

We're taking a look at that.



On Mon, Oct 10, 2016 at 4:36 PM, David Garcia  wrote:

> Thx for the responses.  I was able to identify a bug in how the times are
> obtained (offsets resolved as unknown cause the issue):
>
> “Actually, I think the bug is more subtle.  What happens when a consumed
> topic stops receiving messages?  The smallest timestamp will always be the
> static timestamp of this topic.
>
> -David
>
> On 10/7/16, 5:03 PM, "David Garcia"  wrote:
>
> Ok I found the bug.  Basically, if there is an empty topic (in the
> list of topics being consumed), any partition-group with partitions from
> the topic will always return -1 as the smallest timestamp (see
> PartitionGroup.java).
>
> To reproduce, simply start a kstreams consumer with one or more empty
> topics.  Punctuate will never be called.
>
> -David ”
>
> On 10/10/16, 1:55 AM, "Michael Noll"  wrote:
>
> > We have run the application (and have confirmed data is being
> received)
> for over 30 mins…with a 60-second timer.
>
> Ok, so your app does receive data but punctuate() still isn't being
> called.
> :-(
>
>
> > So, do we need to just rebuild our cluster with bigger machines?
>
> That's worth trying out.  See
> http://www.confluent.io/blog/design-and-deployment-
> considerations-for-deploying-apache-kafka-on-aws/
> for some EC2 instance types recommendations.
>
> But I'd also suggest to look into the logs of (1) your application,
> (2) the
> log files of the Kafka broker(s), and (3) the log files of ZooKeeper
> to see
> whether you see anything suspicious?
>
> Sorry for not being able to provide more actionable feedback at this
> point.  Typically we have seen such issues only (but not exclusively)
> in
> cases where there have been problems in the environment in which your
> application is running and/or the environment of the Kafka clusters.
> Unfortunately these environment problems are a bit tricky to debug
> remotely
> via the mailing list.
>
> -Michael
>
>
>
>
>
> On Fri, Oct 7, 2016 at 8:11 PM, David Garcia 
> wrote:
>
> > Yeah, this is possible.  We have run the application (and have
> confirmed
> > data is being received) for over 30 mins…with a 60-second timer.
> So, do we
> > need to just rebuild our cluster with bigger machines?
> >
> > -David
> >
> > On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
> >
> > David,
> >
> > punctuate() is still data-driven at this point, even when you're
> using
> > the
> > WallClock timestamp extractor.
> >
> > To use an example: Imagine you have configured punctuate() to be
> run
> > every
> > 5 seconds.  If there's no data being received for a minute, then
> > punctuate
> > won't be called -- even though you probably would have expected
> this to
> > happen 12 times during this 1 minute.
> >
> > (FWIW, there's an ongoing discussion to improve punctuate(),
> part of
> > which
> > is motivated by the current behavior that arguably is not very
> > intuitive to
> > many users.)
> >
> > Could this be the problem you're seeing?  See also the related
> > discussion
> > at
> > http://stackoverflow.com/questions/39535201/kafka-problems-with-
> > timestampextractor
> > .
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 6:07 PM, David Garcia <
> dav...@spiceworks.com>
> > wrote:
> >
> > > Hello, I’m sure this question has been asked many times.
> > > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> > m4.xlarges.  We
> > > have an application that needs to use the punctuate() function
> to do
> > some
> > > work on a regular interval.  We are using the WallClock
> extractor.
> > > Unfortunately, the method is never called.  I have checked the
> > > filedescriptor setting for both the user as well as the
> process, and
> > > everything seems to be fine.  Is this a known bug, or is there
> > something
> > > obvious I’m missing?
> > >
> > > One note, the application used to work on this cluster, but
> now it’s
> > not
> > > working.  Not really sure what is going on?
> > >
> > > -David
> > >
> >
> >
> >
>
>
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
When I wrote:

"If you haven't changed to default key and value serdes, then `to()`
will fail because [...]"

it should have read:

"If you haven't changed the default key and value serdes, then `to()`
will fail because [...]"



On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll  wrote:

> Ratha,
>
> if you based your problematic code on the PipeDemo example, then you
> should have these two lines in your code (which most probably you haven't
> changed):
>
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>
> This configures your application to interpret (= encode/decode), by
> default, the keys and values of any messages it reads from Kafka as
> strings.  This works for the PipeDemo example because the keys and values
> are actually strings.
>
> In your application, however, you do:
>
>KStream kafkaPayloadStream =
> builder.stream(sourceTopics);
>
> This won't work, because `builder.stream()`, when calling it without
> explicit serdes, will use the default serdes configured for your
> application.  So `builder.stream(sourceTopics)` will give you
> `KStream`, not `KStream`.  Also, you
> can't just cast a String to KafkaPayload to "fix" the problem;  if you
> attempt to do so you run into the ClassCastException that you reported
> below.
>
> What you need to do fix your problem is:
>
> 1. Provide a proper serde for `KafkaPayload`.  See
> http://docs.confluent.io/current/streams/developer-
> guide.html#implementing-custom-serializers-deserializers-serdes.  There
> are also example implementations of such custom serdes at [1] and [2].
>
> Once you have that, you can e.g. write:
>
> final Serde stringSerde = Serdes.String(); // provided by Kafka
> final Serde kafkaPayloadSerde = ...; // must be provided
> by you!
>
> 2.  Call `builder.stream()` with explicit serdes to overrides the default
> serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.
>
> KStream kafkaPayloadStream =
> builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
> That should do it.
>
> Lastly, you must think about serialization also when calling `to()` or
> `through()`:
>
> kafkaPayloadStream.to(targetTopic);
>
> If you haven't changed to default key and value serdes, then `to()` will
> fail because it will by default (in your app configuration) interpret
> message values still as strings rather than KafkaPayload.  To fix this you
> should call:
>
> kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
>
> You need to override the default serdes whenever the data must be written
> with, well, non-default serdes.
>
> I'd recommend reading http://docs.confluent.io/current/streams/developer-
> guide.html#data-types-and-serialization to better understand how this
> works.
>
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/streams/developer-
> guide.html#available-serializers-deserializers-serdes
> [2] https://github.com/confluentinc/examples/tree/
> kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> confluent/examples/streams/utils
>
>
>
>
> On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:
>
>> I checked my target topic and I see few messages than the source topic.
>> (If
>> source topic have 5 messages, I see 2 messages in my target topic) What
>> settings I need to do ?
>>
>> And, when I try to consume message from the target topic, I get ClassCast
>> Exception.
>>
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> xx.yy.core.kafkamodels.KafkaPayload;
>>
>> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>>
>>
>> I Merge two topics like;
>>
>> * KStreamBuilder builder = new KStreamBuilder();*
>>
>> * KStream kafkaPayloadStream =
>> builder.stream(sourceTopics);*
>>
>> * kafkaPayloadStream.to(targetTopic);*
>>
>> * streams = new KafkaStreams(builder, properties);*
>>
>> * streams.start();*
>>
>>
>> Why do I see classcast exception when consuming the message?
>>
>>
>> On 11 October 2016 at 15:19, Ratha v  wrote:
>>
>> > Hi all;
>> > I have custom datatype defined (a pojo class).
>> > I copy  messages from one topic to another topic.
>> > I do not see any messages in my target topic.
>> > This works fro string messages, but not for my custom message.
>> > Waht might be the cause?
>> > I followed this sample [1]
>> > [1]
>> > https://github.com/apache/kafka/blob/trunk/streams/
>> > examples/src/main/java/org/apache/kafka/streams/examples/
>> > pipe/PipeDemo.java
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
Ratha,

if you based your problematic code on the PipeDemo example, then you should
have these two lines in your code (which most probably you haven't changed):

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());

This configures your application to interpret (= encode/decode), by
default, the keys and values of any messages it reads from Kafka as
strings.  This works for the PipeDemo example because the keys and values
are actually strings.

In your application, however, you do:

   KStream kafkaPayloadStream =
builder.stream(sourceTopics);

This won't work, because `builder.stream()`, when calling it without
explicit serdes, will use the default serdes configured for your
application.  So `builder.stream(sourceTopics)` will give you
`KStream`, not `KStream`.  Also, you
can't just cast a String to KafkaPayload to "fix" the problem;  if you
attempt to do so you run into the ClassCastException that you reported
below.

What you need to do fix your problem is:

1. Provide a proper serde for `KafkaPayload`.  See
http://docs.confluent.io/current/streams/developer-guide.html#implementing-custom-serializers-deserializers-serdes.
There are also example implementations of such custom serdes at [1] and [2].

Once you have that, you can e.g. write:

final Serde stringSerde = Serdes.String(); // provided by Kafka
final Serde kafkaPayloadSerde = ...; // must be provided
by you!

2.  Call `builder.stream()` with explicit serdes to overrides the default
serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.

KStream kafkaPayloadStream =
builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);

That should do it.

Lastly, you must think about serialization also when calling `to()` or
`through()`:

kafkaPayloadStream.to(targetTopic);

If you haven't changed to default key and value serdes, then `to()` will
fail because it will by default (in your app configuration) interpret
message values still as strings rather than KafkaPayload.  To fix this you
should call:

kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);

You need to override the default serdes whenever the data must be written
with, well, non-default serdes.

I'd recommend reading
http://docs.confluent.io/current/streams/developer-guide.html#data-types-and-serialization
to better understand how this works.


Hope this helps,
Michael



[1]
http://docs.confluent.io/current/streams/developer-guide.html#available-serializers-deserializers-serdes
[2]
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams/utils




On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:

> I checked my target topic and I see few messages than the source topic. (If
> source topic have 5 messages, I see 2 messages in my target topic) What
> settings I need to do ?
>
> And, when I try to consume message from the target topic, I get ClassCast
> Exception.
>
> java.lang.ClassCastException: java.lang.String cannot be cast to
> xx.yy.core.kafkamodels.KafkaPayload;
>
> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>
>
> I Merge two topics like;
>
> * KStreamBuilder builder = new KStreamBuilder();*
>
> * KStream kafkaPayloadStream =
> builder.stream(sourceTopics);*
>
> * kafkaPayloadStream.to(targetTopic);*
>
> * streams = new KafkaStreams(builder, properties);*
>
> * streams.start();*
>
>
> Why do I see classcast exception when consuming the message?
>
>
> On 11 October 2016 at 15:19, Ratha v  wrote:
>
> > Hi all;
> > I have custom datatype defined (a pojo class).
> > I copy  messages from one topic to another topic.
> > I do not see any messages in my target topic.
> > This works fro string messages, but not for my custom message.
> > Waht might be the cause?
> > I followed this sample [1]
> > [1]
> > https://github.com/apache/kafka/blob/trunk/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/
> > pipe/PipeDemo.java
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Michael Noll
Aris,

I am not aware of an out of the box tool for Pcap->Kafka ingestion (in my
case back then we wrote our own).  Maybe others know.



On Monday, October 10, 2016, Aris Risdianto  wrote:

> Thank you for answer Michael.
>
> Actually, I have made a simple producer from Pcap to Kafka. Since it is not
> structured, so it is difficult for further processing by a consumer. But, I
> will take a look at Avro as you mentioned.
>
> I just wondering, if there are any proper implementation for this
> requirement, because I couldn't find any tool in the kafka ecosystem page.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
>
>
> Best Regards,
> Aris.
>
>
> On Mon, Oct 10, 2016 at 6:55 PM, Michael Noll  > wrote:
>
> > Aris,
> >
> > even today you can already use Kafka to deliver Netflow/Pcap/etc.
> messages,
> > and people are already using it for that (I did that in previous projects
> > of mine, too).
> >
> > Simply encode your Pcap/... messages appropriately (I'd recommend to
> take a
> > look at Avro, which allows you to structure your data similar to e.g.
> > Pcap's native format [1]), and then write the encoded messages to Kafka.
> > Your downstream applications can then read the encoded messages back from
> > Kafka, decode, and commence processing.
> >
> > That was a brief summary to get you started, feel free to take a look at
> > the Apache Kafka docs at kafka.apache.org and/or ask further questions
> > here.
> >
> > -Michael
> >
> >
> >
> >
> > [1] https://wiki.wireshark.org/Development/LibpcapFileFormat
> >
> > On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  > wrote:
> >
> > > ​Hello,
> > >
> > >
> > > ​Is there any plan or implementation to use Kafka for delivering
> > > sFlow/NetFlow/Pcap messages?
> > >
> > >
> > > Best Regards,
> > > Aris.
> > >
> >
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>


  1   2   >