Re: APACHE LICENSES

2017-05-23 Thread Mathieu Fenniak
Hi Lis,

Yes, they are free software.  The full terms of the licenses are
available here: https://github.com/apache/kafka/blob/trunk/LICENSE and
here: https://github.com/apache/zookeeper/blob/master/LICENSE.txt

Mathieu


On Tue, May 23, 2017 at 5:54 AM, LISBETH SANTAMARIA GUTIERREZ
 wrote:
> Good morning,
>
> The licenses Kafka &  Zookeeper  are free software?
>
>
>
> King regards
>
>
> *BBVA*
> *Lis Santamaría Gutiérrez*
> *CIB Engineering -  CTO -  Technical Systems & Solutions*
> E-mail: lisbeth.santamaria.contrac...@bbva.com
> 
>
> *LAS TABLAS III  - Isabel de Colbrand 4 - pl 1 – 28050 Madrid*
>
> --
>
>
> "Este mensaje está dirigido de manera exclusiva a su destinatario y puede
> contener información privada y confidencial. No lo reenvíe, copie o
> distribuya a terceros que no deban conocer su contenido. En caso de haberlo
> recibido por error,  rogamos lo notifique al remitente y proceda a su
> borrado, así como al de cualquier documento que pudiera adjuntarse.
>
>  Por favor tenga en cuenta que los correos enviados vía Internet no
> permiten garantizar la confidencialidad de los mensajes ni su transmisión
> de forma íntegra.
>
>  Las opiniones expresadas en el presente correo pertenecen únicamente al
> remitente y no representan necesariamente la opinión del Grupo BBVA."
>
>  "This message is intended exclusively for the adressee and may contain
> privileged and confidential information. Please, do not disseminate, copy
> or distribute it to third parties who should not receive it. In case you
> have received it by mistake, please inform the sender and delete the
> message and attachments from your system.
>
>  Please keep in mind that e-mails sent by Internet do not allow to
> guarantee neither the confidentiality or the integrity of the messages
> sent."


Re: Data loss after a Kafka broker restart scenario.

2017-05-16 Thread Mathieu Fenniak
Hi Fathima,

Setting "retries=0" on the producer means that an attempt to produce a
message, if it encounters an error, will result in that message being
lost.  It's likely the producer will encounter intermittent errors when you
kill one broker in the cluster.

I'd suggest trying this test with a higher value for "retries".  Note that
you'll only be guaranteed at-least-once processing, not exactly-once.

Mathieu


On Tue, May 16, 2017 at 4:17 AM, Fathima Amara  wrote:

>
> Hi all,
>
> I am using Kafka 2.11-0.10.0.1 and Zookeeper 3.4.8.
> I have a cluster of 4 servers(A,B,C,D) running one kafka broker on each of
> them and, one zookeeper server on server A. Data is initially produced from
> server A using a Kafka Producer and it goes through servers B,C,D being
> subjected to processing and finally reaches server A again(gets consumed
> using a Kafka Consumer).
>
> Topics created on the end of each process has 2 partitions with a
> replication-factor of 3. Other configurations include,
> unclean.leader.election.enable=false
> acks=all
> retries=0
> I let the producer run for a while in server A, then kill one of the Kafka
> brokers on the cluster(B,C,D) while data processing takes place and restart
> it. When consuming from the end of server A, I notice a considerable amount
> of data lost which varies on each run! ex:- on an input of 1 million events
> 5930 events are lost.
>
> Is the reason for this the Kafka Producer not guaranteeing Exactly-once
> processing or is this due to some other reason? What other reasons cause
> data loss?
>


Re: [VOTE] 0.10.2.1 RC1

2017-04-17 Thread Mathieu Fenniak
+1 (non-binding)

Upgraded KS & KC applications to 0.10.2.1 RC1, successfully ran
application-level acceptance tests.

Mathieu


On Wed, Apr 12, 2017 at 6:25 PM, Gwen Shapira  wrote:

> Hello Kafka users, developers, client-developers, friends, romans,
> citizens, etc,
>
> This is the second candidate for release of Apache Kafka 0.10.2.1.
>
> This is a bug fix release and it includes fixes and improvements from 24
> JIRAs
> (including a few critical bugs).
>
> Release notes for the 0.10.2.1 release:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, April 17, 5:30 pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> Your help in validating this bugfix release is super valuable, so
> please take the time to test and vote!
>
> Suggested tests:
>  * Grab the source archive and make sure it compiles
>  * Grab one of the binary distros and run the quickstarts against them
>  * Extract and verify one of the site docs jars
>  * Build a sample against jars in the staging repo
>  * Validate GPG signatures on at least one file
>  * Validate the javadocs look ok
>  * The 0.10.2 documentation was updated for this bugfix release
> (especially upgrade, streams and connect portions) - please make sure
> it looks ok: http://kafka.apache.org/documentation.html
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> e133f2ca57670e77f8114cc72dbc2f91a48e3a3b
>
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
>
> /**
>
> Thanks,
>
> Gwen Shapira
>


Re: [VOTE] 0.10.2.1 RC0

2017-04-10 Thread Mathieu Fenniak
Hi Gwen,

+1, looks good to me.  Tested broker upgrades, and connect & streams
applications.

Mathieu


On Fri, Apr 7, 2017 at 6:12 PM, Gwen Shapira  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for the release of Apache Kafka 0.10.2.1. This
> is a bug fix release and it includes fixes and improvements from 24 JIRAs
> (including a few critical bugs). See the release notes for more details:
>
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, 13 April, 8am PT ***
>
> Your help in validating this bugfix release is super valuable, so
> please take the time to test and vote!
>
> Few notes:
> 1. There are missing "Notable Changes" in the docs:
> https://github.com/apache/kafka/pull/2824
> I will review, merge and update the docs by Monday.
> 2. The last commit (KAFKA-4943 chery-pick) did not pass system tests
> yet. We may need another RC if system tests fail tonight.
>
> Suggested tests:
>  * Grab the source archive and make sure it compiles
>  * Grab one of the binary distros and run the quickstarts against them
>  * Extract and verify one of the site docs jars
>  * Build a sample against jars in the staging repo
>  * Validate GPG signatures on at least one file
>  * Validate the javadocs look ok
>
> *
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging
>
> * Javadoc:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc0/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> d08115f05da0e39c7f75b45e05d6d14ad5baf71d
>
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
>
> Thanks,
> Gwen Shapira
>


Managing topic configuration w/ auto.create.topics.enable

2017-03-28 Thread Mathieu Fenniak
Hey Kafka Users,

When using a Kafka broker w/ auto.create.topics.enable set to true, how do
Kafka users generally manage configuration of those topics?  In
particular, cleanup.policy={compact/delete} can be a crucial configuration
value to get correct.

In my application, I have a couple Kafka Connect sources that generate all
the input to my Kafka brokers, across about 30 different topics.  All of
these specific topics are used in a KStreams app as tables, so they should
be configured w/ cleanup.policy=compact so that state-store restoration can
take place on them without losing data.  As far as I know, there's no way
for a Kafka producer (or more specifically a KC source) to indicate topic
configuration if/when the topic is auto-created, so it needs to be done
afterwards.

Other topics, such as KStreams repartition topics, should be
cleanup.policy=delete, and others such as KStream changelog topics should
be cleanup.policy=compact... but KStreams handles all this for me for
internal topics.

I'm envisioning the ideal solution as something like a scheduled task that
runs against my cluster, with a configuration, that identifies and corrects
misconfigurations.  This could then roll out through different staging and
testing areas just like a general software change.  But developing such a
tool doesn't seem straight-forward since there currently isn't an API for
managing topic configuration either...

Mathieu


Re: Streams RocksDBException with no message?

2017-03-21 Thread Mathieu Fenniak
Thanks Guozhang.

For my part, turns out I was hitting ulimit on my open file descriptors.
Phew, easy to fix... once I figured it out. :-)

Mathieu


On Fri, Mar 17, 2017 at 4:14 PM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> We are aware of that since long time ago and I have been looking into this
> issue, turns out to be a known issue in RocksDB:
>
> https://github.com/facebook/rocksdb/issues/1688
>
> And the corresponding fix (https://github.com/facebook/rocksdb/pull/1714)
> has been merged in master but marked for
>
>- v5.1.4 <https://github.com/facebook/rocksdb/releases/tag/v5.1.4>
>
> only while the latest release is 5.1.2.
>
>
> Guozhang
>
>
> On Fri, Mar 17, 2017 at 10:27 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hey all,
> >
> > So... what does it mean to have a RocksDBException with a message that
> just
> > has a single character?  "e", "q", "]"... I've seen a few.  Has anyone
> seen
> > this before?
> >
> > Two example exceptions:
> > https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12
> >
> > Kafka Streams 0.10.2.0.  Both of these errors occurred during state store
> > initialization.  I'm running a single Kafka Streams thread per server,
> this
> > occurred on two servers about a half-hour apart.
> >
> > Mathieu
> >
>
>
>
> --
> -- Guozhang
>


Streams RocksDBException with no message?

2017-03-17 Thread Mathieu Fenniak
Hey all,

So... what does it mean to have a RocksDBException with a message that just
has a single character?  "e", "q", "]"... I've seen a few.  Has anyone seen
this before?

Two example exceptions:
https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12

Kafka Streams 0.10.2.0.  Both of these errors occurred during state store
initialization.  I'm running a single Kafka Streams thread per server, this
occurred on two servers about a half-hour apart.

Mathieu


Re: Pattern to create Task with dependencies (DI)

2017-03-14 Thread Mathieu Fenniak
Hey Petr,

I have the same issue.  But I just cope with it; I wire up default
dependencies directly in the connector and task constructors, expose them
through properties, and modify them to refer to mocks in my unit tests.

It's not a great approach, but it is simple.

Why KConnect does take control from me how tasks are created?  ... Or
> instead of getTaskClass() to have createTask() on Connector which returns
> task instance.


The problem is that in distributed mode, Kafka Connect will be creating
connectors on "arbitrary" nodes, and then very likely creating tasks on
nodes where the connector is not instantiated.  This is my understanding of
why Kafka Connect takes the control away from you.

I'm not saying it can't be done better, but it's definitely not without
reason. :-)

Mathieu


On Mon, Mar 13, 2017 at 4:42 AM, Petr Novak  wrote:

> Hello,
>
> Nobody has experience with Kafka Connect tasks with external dependencies?
>
>
>
> Thanks,
>
> Petr
>
>
>
> From: Petr Novak [mailto:oss.mli...@gmail.com]
> Sent: 23. února 2017 14:48
> To: users@kafka.apache.org
> Subject: Pattern to create Task with dependencies (DI)
>
>
>
> Hello,
>
> it seems that KConnect take control over creating task instance and
> requires
> no-arg constructor. What is the recommended pattern when I need to create a
> task which has dependency e.g. on some db client and I want to be able to
> pass in mock in tests, preferable through constructor?
>
> In Java I probably have to divide actual task implementation into separate
> class and use method forwarding from task factory class. I assume that this
> factory, whose class would be passed to KConnect, requires to extend
> abstract class whose “this” I have to pass to implementing class as another
> dependency. I can’t figure out anything less ugly.
>
>
>
> In Scala it can be done elegantly through mixin composition which is
> invisible to KConnect.
>
>
>
> Why KConnect does take control from me how tasks are created? Why KConnect
> doesn’t accept factory class on which it would call no-arg create method
> which would return instance of my task so that I can control how it is
> created and which dependencies are created. Or instead of getTaskClass() to
> have createTask() on Connector which returns task instance.
>
>
>
> Many thanks for advice,
>
> Petr
>
>


Re: KS coordinator dead -> CommitFailedException

2017-03-13 Thread Mathieu Fenniak
Hi Guozhang,

Thanks for the response.

I don't believe that my client has soft failed, as my max.poll.interval.ms
is configured at 180 (30 minutes) and the client app shows "Committing
task StreamTask [N_N]" log messages within the past few (1-3) minutes of
the failure.

In terms of the "underlying infrastructure issue", I'm not really sure what
I have in mind.  I'm just assuming there is some reason why the client
disconnected from the broker, and I'm thinking it's not client-side.  A
long GC on the broker, rather than the client, might make more sense to
me...

Mathieu


On Fri, Mar 10, 2017 at 6:22 PM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> Your understanding is correct, just a few remarks:
>
> 1. "Marking the coordinator ... dead for group ..." this is because the
> heartbeat protocol is actually mutual: i.e. from the consumer's point of
> view, if it has not received a response, it will mark the current
> remembered coordinator as "dead" and try to re-discovery the coordinator.
> Note this is not a WARN / ERROR log since it is actually normal in many
> cases, since as long as it has re-discovered the coordinator, which still
> acks itself as the member of the group (i.e. its first HB to the newly
> founded coordinator, which may just be the old one, returns no error code)
> then the consumer assumes that it still owns the assigned partitions and
> hence continue happy fetching.
>
> Only when the coordinator has really kicked itself out of the group, then
> upon re-connecting it may receive an error code in HB response and hence
> tries to re-join, and if it happens to piggy-back a commit request together
> you will see the "the group has already rebalanced .." error message.
>
> 2. For your case, it is likely that the broker is OK but the client is soft
> failed, like a long GC. try increasing the "max.poll.interval.ms" and see
> if it helps (that controls the maximum period a client can live without
> calling poll() to send another HB).
>
> 3. Re:
>
> > A deeper look into the specific state of the client, network, and broker
> at this time interval might suggest an underlying infrastructure issue that
> should be investigated.
>
> Could you share what underlying infrastructure issue are you referring to?
>
> BTW there is some related issue for the commit failed exception that has
> been fixed in trunk post 0.10.2:
> https://github.com/apache/kafka/commit/4db048d61206bc6efbd14
> 3d6293216b7cb4b86c5
>
>
> Guozhang
>
>
> On Fri, Mar 10, 2017 at 1:01 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Attempting to answer my own questions... but I'd love if someone pointed
> > out any misunderstandings. :-)
> >
> > What causes the Kafka client to "Marking the coordinator ... dead for
> group
> > > ..."?  Is it untimely heartbeat messages?
> >
> >
> > Small number of causes, but for a running application and a Kafka broker
> > that didn't die, untimely heartbeats are probably occurring.  Maybe an
> > intermittent network blip, or high load (I/O?) on the network broker.
> >
> > Any thoughts on what would cause the commit to fail after a "dead" like
> > > that, even though it is well within my max.poll.interval.ms?
> >
> >
> > My configuration had default session.timeout.ms (10 seconds) and
> > heartbeat.interval.ms (3 seconds).  If the intermittent network or load
> > issue disappeared but took 60 seconds, as suggested by my logs, my
> > consumer's session would have timed out on the server.
> >
> > Probably bumping up my session.timeout.ms is the immediate answer (and
> > possibly heartbeat.interval.ms, just to keep it reasonable).  A deeper
> > look
> > into the specific state of the client, network, and broker at this time
> > interval might suggest an underlying infrastructure issue that should be
> > investigated.
> >
> > Is the message "the group has already rebalanced and assigned the
> > > partitions to another member" really necessarily true?
> >
> >
> > The partitions are revoked from this app's consumer session.  If there's
> no
> > other consumers, they message's reference to "another member" is a tiny
> bit
> > misleading, but the point that they're not assigned to "me" is accurate.
> >
> > Mathieu
> >
> >
> > On Fri, Mar 10, 2017 at 11:15 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> > > Hey Kafka Users,
> > >
> > > I've been observing

Re: KS coordinator dead -> CommitFailedException

2017-03-10 Thread Mathieu Fenniak
Attempting to answer my own questions... but I'd love if someone pointed
out any misunderstandings. :-)

What causes the Kafka client to "Marking the coordinator ... dead for group
> ..."?  Is it untimely heartbeat messages?


Small number of causes, but for a running application and a Kafka broker
that didn't die, untimely heartbeats are probably occurring.  Maybe an
intermittent network blip, or high load (I/O?) on the network broker.

Any thoughts on what would cause the commit to fail after a "dead" like
> that, even though it is well within my max.poll.interval.ms?


My configuration had default session.timeout.ms (10 seconds) and
heartbeat.interval.ms (3 seconds).  If the intermittent network or load
issue disappeared but took 60 seconds, as suggested by my logs, my
consumer's session would have timed out on the server.

Probably bumping up my session.timeout.ms is the immediate answer (and
possibly heartbeat.interval.ms, just to keep it reasonable).  A deeper look
into the specific state of the client, network, and broker at this time
interval might suggest an underlying infrastructure issue that should be
investigated.

Is the message "the group has already rebalanced and assigned the
> partitions to another member" really necessarily true?


The partitions are revoked from this app's consumer session.  If there's no
other consumers, they message's reference to "another member" is a tiny bit
misleading, but the point that they're not assigned to "me" is accurate.

Mathieu


On Fri, Mar 10, 2017 at 11:15 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hey Kafka Users,
>
> I've been observing a few instances of CommitFailedException (group has
> already rebalanced) that seem to happen well-within max.poll.interval.ms
> since the last commit.  In at least one specific case that I've looked at,
> between the last successful commit and the failed commit, there is a
> "Marking the coordinator ... dead for group ..." message.
>
> Log excerpts / sequence, including consumer config dump:
> https://gist.github.com/mfenniak/46113b4e3cbe35cc54ee103cb0515f34  This
> is a Kafka Streams application, with Kafka Streams & brokers on 0.10.2.0.
>
> What causes the Kafka client to "Marking the coordinator ... dead for
> group ..."?  Is it untimely heartbeat messages?
>
> Any thoughts on what would cause the commit to fail after a "dead" like
> that, even though it is well within my max.poll.interval.ms?
>
> Is the message "the group has already rebalanced and assigned the
> partitions to another member" really necessarily true?  Because in this
> specific instance, there was only one application instance running in this
> consumer group, so I'm a little skeptical.
>
> Thanks all,
>
> Mathieu
>
>


KS coordinator dead -> CommitFailedException

2017-03-10 Thread Mathieu Fenniak
Hey Kafka Users,

I've been observing a few instances of CommitFailedException (group has
already rebalanced) that seem to happen well-within max.poll.interval.ms
since the last commit.  In at least one specific case that I've looked at,
between the last successful commit and the failed commit, there is a
"Marking the coordinator ... dead for group ..." message.

Log excerpts / sequence, including consumer config dump:
https://gist.github.com/mfenniak/46113b4e3cbe35cc54ee103cb0515f34  This is
a Kafka Streams application, with Kafka Streams & brokers on 0.10.2.0.

What causes the Kafka client to "Marking the coordinator ... dead for group
..."?  Is it untimely heartbeat messages?

Any thoughts on what would cause the commit to fail after a "dead" like
that, even though it is well within my max.poll.interval.ms?

Is the message "the group has already rebalanced and assigned the
partitions to another member" really necessarily true?  Because in this
specific instance, there was only one application instance running in this
consumer group, so I'm a little skeptical.

Thanks all,

Mathieu


Re: Kafka offset being reset

2017-02-28 Thread Mathieu Fenniak
Hi Vishnu,

I'd suggest you take a look at the broker configuration value
"offsets.retention.minutes".

The consumer offsets are stored in the __consumer_offsets topic.
 __consumer_offsets is a compacted topic (cleanup.policy=compact), where
the key is the combination of the consumer group, the topic, and the
partition, and the value is the offset, a timestamp it was committed, and
some metadata.

Because it is a compacted topic, it would normally never lose data.  But
imagine if you misconfigured a consumer with the wrong group.id and it
started committing offsets before you shut it down and corrected it.  Well,
that data would "normally" live forever because they're in a compacted
topic, but effectively they need to be "garbage collected" at some point.

"offsets.retention.minutes" is like a garbage collector for the
__consumer_offsets topic.  Every "offsets.retention.check.interval.ms",
Kafka scans the topic and it adds delete tombstones for any offset that
hasn't been seen in "offsets.retention.minutes".  Compaction will then
eventually remove those values completely.  This allows offsets to be
cleaned up when they're no longer needed.

The default value of "offsets.retention.minutes" is 1440 minutes (24 hours).

Mathieu


On Mon, Feb 27, 2017 at 4:50 PM, Vishnu Gopal  wrote:

> Team,
>
>
> I came across an issue where the offset positing is being reset and the
> consumer app is receiving the same message in the same offset repeatedly.
> this is happing every 3 or 4 days apart. I have purged the topic for now
> and re-started the app. but can any body give me a direction on how to
> replicate this issue and how to resolve this. Any help is great.
>
>
> Thanks,
>
> Vishnu
>


Re: Question about messages in __consumer_offsets topic

2017-02-22 Thread Mathieu Fenniak
Hi Jun,

I ran into the same question today (see thread, subject: Consumer / Streams
causes deletes in __consumer_offsets?), and here's what Eno and Guozhang
helped me understand:

There are broker-level configuration values called
"offsets.retention.minutes" and "offsets.retention.check.interval.ms".
Every "offsets.retention.check.interval.ms", consumer group offsets that
haven't been published in "offsets.retention.minutes" have a null record
published in the __consumer_offsets topic, so that the eventual compaction
of the topic will clean them up. :-)

Mathieu


On Wed, Feb 22, 2017 at 6:59 PM, Jun MA  wrote:

> Hi Todd,
>
> Thank you so much for your reply. I assume that the broker will produce
> the tombstone to __consumer_offsets topic when the offset expires? I’m
> curious how broker notices the offset expires? Does it store the offset
> message in memory and periodically check if someone expires?
>
> Thanks,
> Jun
>
> > On Feb 22, 2017, at 4:37 PM, Todd Palino  wrote:
> >
> > __consumer_offsets is a log-compacted topic, and a NULL body indicates a
> > delete tombstone. So it means to delete the entry that matches the key
> > (group, topic, partition tuple).
> >
> > -Todd
> >
> >
> >
> > On Wed, Feb 22, 2017 at 3:50 PM, Jun MA  wrote:
> >
> >> Hi guys,
> >>
> >> I’m trying to consume from __consumer_offsets topic to get exact
> committed
> >> offset of each consumer. Normally I can see messages like:
> >>
> >> [eds-els-recopp-jenkins-01-5651,eds-incre-staging-1,0]::[
> >> OffsetMetadata[29791925,NO_METADATA],CommitTime
> >> 1487090167367,ExpirationTime 1487176567367],
> >>
> >> which make sense to me. But sometimes I see messages like:
> >>
> >> [eds-elssearchindex-curiosity-stg-10892,eds-incre-v2-staging
> -els,0]::NULL.
> >>
> >> Can someone explains what is NULL means here and why a NULL value get
> >> published to __consumer_offsets?
> >>
> >> We’re running kafka 0.9.0.1 and we use org.apache.kafka.common.
> >> serialization.ByteArrayDeserializer and GroupMetadataManager.OffsetsMe
> ssageFormatter
> >> to parse the message.
> >>
> >> Thanks,
> >> Jun
> >
> >
> >
> >
> > --
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
>
>


Re: Consumer / Streams causes deletes in __consumer_offsets?

2017-02-22 Thread Mathieu Fenniak
Thanks Guozhang, that clarifies the Streams behavior.

I'm imagining that a Streams application might only commit partition
offsets that have changed, and therefore a partition that is idle for
greater than offsets.retention.minutes might lose its offsets when the app
restarts.  Does that seem plausible?

That theory seems to be supported by a brief look at the code;
StreamTask#commitOffsets() sends only the consumed offsets since the
last commitOffsets
call.

This would definitely match behavior I've been puzzled about for a while.
I deploy my Streams app, shove a lot of data at it to see how it is
performing and outputting, then go and do some more development work.
After a day or two (or maybe a weekend), I redeploy the app, and it pops
back to the beginning of all the topics., surprising the heck out of me and
making me think I broke something. :-)

Increasing offsets.retention.minutes seems like the easy immediate fix.  It
might be ideal if a Streams app kept idle offsets refreshed occasionally,
but it's not too likely to impact more realistic use-cases.

Mathieu


On Wed, Feb 22, 2017 at 2:18 PM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> In Streams the consumer config "enable.auto.commit" is always forced to
> false, and a separate "commit.interval.ms" is set. With that even if you
> do
> not have any data processed the commit operation will be triggered after
> that configured period of time.
>
>
> Guozhang
>
>
> On Wed, Feb 22, 2017 at 8:41 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hi Eno,
> >
> > Thanks for the quick reply.  I think that probably does match the data
> I'm
> > seeing.  This surprises me a bit because my streams app was only offline
> > for a few minutes, but ended up losing its offset.
> >
> > My interpretation is that the source partition had been idle for 24
> hours,
> > streams doesn't commit offsets for idle partitions, and so the
> > default/unconfigured offset retention of 24 hours had expired.
> >
> > I'll work around this by bumping up my offset retention.  Thanks!
> >
> > Mathieu
> >
> >
> > On Wed, Feb 22, 2017 at 9:22 AM, Eno Thereska 
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > It could be that the offset retention period has expired. See this:
> > > http://stackoverflow.com/questions/39131465/how-does-
> > > an-offset-expire-for-an-apache-kafka-consumer-group <
> > > http://stackoverflow.com/questions/39131465/how-does-
> > > an-offset-expire-for-an-apache-kafka-consumer-group>
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 22 Feb 2017, at 16:08, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com>
> > > wrote:
> > > >
> > > > Hey users,
> > > >
> > > > What causes delete tombstones (value=null) to be sent to the
> > > > __consumer_offsets topic?
> > > >
> > > > I'm observing that a Kafka Streams application that is restarted
> after
> > a
> > > > crash appears to be reprocessing messages from the beginning of a
> > topic.
> > > > I've dumped the __consumer_offsets topic and found that after the
> > > restart,
> > > > messages with a null value are being sent to __consumer_offsets.
> > > >
> > > > I do see that the ConsumerConfig for my StreamThread consumer has
> > > > auto.offset.reset=earliest.  But my understanding of this
> configuration
> > > is
> > > > that it only applies when the offset isn't available, but there are
> > > > definitely offsets for this consumer group stored in
> > __consumer_offsets.
> > > >
> > > > Here's the consumer config for the streams app:
> > > >
> > > > ConsumerConfig values:
> > > >  auto.commit.interval.ms = 5000
> > > >  auto.offset.reset = earliest
> > > >  bootstrap.servers = [10.10.59.184:9092]
> > > >  check.crcs = true
> > > >  client.id =
> > > > timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6-
> > > StreamThread-1-consumer
> > > >  connections.max.idle.ms = 54
> > > >  enable.auto.commit = false
> > > >  exclude.internal.topics = true
> > > >  fetch.max.bytes = 52428800
> > > >  fetch.max.wait.ms = 500
> > > >  fetch.min.bytes = 1
> > > >  group.id = timesheet-list
> > > >  heartbeat.interval.ms = 3000
> > > >  interceptor.classes = null
> > > >

Re: Consumer / Streams causes deletes in __consumer_offsets?

2017-02-22 Thread Mathieu Fenniak
Hi Eno,

Thanks for the quick reply.  I think that probably does match the data I'm
seeing.  This surprises me a bit because my streams app was only offline
for a few minutes, but ended up losing its offset.

My interpretation is that the source partition had been idle for 24 hours,
streams doesn't commit offsets for idle partitions, and so the
default/unconfigured offset retention of 24 hours had expired.

I'll work around this by bumping up my offset retention.  Thanks!

Mathieu


On Wed, Feb 22, 2017 at 9:22 AM, Eno Thereska 
wrote:

> Hi Mathieu,
>
> It could be that the offset retention period has expired. See this:
> http://stackoverflow.com/questions/39131465/how-does-
> an-offset-expire-for-an-apache-kafka-consumer-group <
> http://stackoverflow.com/questions/39131465/how-does-
> an-offset-expire-for-an-apache-kafka-consumer-group>
>
> Thanks
> Eno
>
> > On 22 Feb 2017, at 16:08, Mathieu Fenniak 
> wrote:
> >
> > Hey users,
> >
> > What causes delete tombstones (value=null) to be sent to the
> > __consumer_offsets topic?
> >
> > I'm observing that a Kafka Streams application that is restarted after a
> > crash appears to be reprocessing messages from the beginning of a topic.
> > I've dumped the __consumer_offsets topic and found that after the
> restart,
> > messages with a null value are being sent to __consumer_offsets.
> >
> > I do see that the ConsumerConfig for my StreamThread consumer has
> > auto.offset.reset=earliest.  But my understanding of this configuration
> is
> > that it only applies when the offset isn't available, but there are
> > definitely offsets for this consumer group stored in __consumer_offsets.
> >
> > Here's the consumer config for the streams app:
> >
> > ConsumerConfig values:
> >  auto.commit.interval.ms = 5000
> >  auto.offset.reset = earliest
> >  bootstrap.servers = [10.10.59.184:9092]
> >  check.crcs = true
> >  client.id =
> > timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6-
> StreamThread-1-consumer
> >  connections.max.idle.ms = 54
> >  enable.auto.commit = false
> >  exclude.internal.topics = true
> >  fetch.max.bytes = 52428800
> >  fetch.max.wait.ms = 500
> >  fetch.min.bytes = 1
> >  group.id = timesheet-list
> >  heartbeat.interval.ms = 3000
> >  interceptor.classes = null
> >  key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >  max.partition.fetch.bytes = 1048576
> >  max.poll.interval.ms = 180
> >  max.poll.records = 1000
> >  metadata.max.age.ms = 30
> >  metric.reporters = []
> >  metrics.num.samples = 2
> >  metrics.recording.level = INFO
> >  metrics.sample.window.ms = 3
> >  partition.assignment.strategy =
> > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> >  receive.buffer.bytes = 65536
> >  reconnect.backoff.ms = 50
> >  request.timeout.ms = 1801000
> >  retry.backoff.ms = 100
> >  sasl.jaas.config = null
> >  sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >  sasl.kerberos.min.time.before.relogin = 6
> >  sasl.kerberos.service.name = null
> >  sasl.kerberos.ticket.renew.jitter = 0.05
> >  sasl.kerberos.ticket.renew.window.factor = 0.8
> >  sasl.mechanism = GSSAPI
> >  security.protocol = PLAINTEXT
> >  send.buffer.bytes = 131072
> >  session.timeout.ms = 1
> >  ssl.cipher.suites = null
> >  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >  ssl.endpoint.identification.algorithm = null
> >  ssl.key.password = null
> >  ssl.keymanager.algorithm = SunX509
> >  ssl.keystore.location = null
> >  ssl.keystore.password = null
> >  ssl.keystore.type = JKS
> >  ssl.protocol = TLS
> >  ssl.provider = null
> >  ssl.secure.random.implementation = null
> >  ssl.trustmanager.algorithm = PKIX
> >  ssl.truststore.location = null
> >  ssl.truststore.password = null
> >  ssl.truststore.type = JKS
> >  value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>


Consumer / Streams causes deletes in __consumer_offsets?

2017-02-22 Thread Mathieu Fenniak
Hey users,

What causes delete tombstones (value=null) to be sent to the
__consumer_offsets topic?

I'm observing that a Kafka Streams application that is restarted after a
crash appears to be reprocessing messages from the beginning of a topic.
I've dumped the __consumer_offsets topic and found that after the restart,
messages with a null value are being sent to __consumer_offsets.

I do see that the ConsumerConfig for my StreamThread consumer has
auto.offset.reset=earliest.  But my understanding of this configuration is
that it only applies when the offset isn't available, but there are
definitely offsets for this consumer group stored in __consumer_offsets.

Here's the consumer config for the streams app:

ConsumerConfig values:
  auto.commit.interval.ms = 5000
  auto.offset.reset = earliest
  bootstrap.servers = [10.10.59.184:9092]
  check.crcs = true
  client.id =
timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6-StreamThread-1-consumer
  connections.max.idle.ms = 54
  enable.auto.commit = false
  exclude.internal.topics = true
  fetch.max.bytes = 52428800
  fetch.max.wait.ms = 500
  fetch.min.bytes = 1
  group.id = timesheet-list
  heartbeat.interval.ms = 3000
  interceptor.classes = null
  key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
  max.partition.fetch.bytes = 1048576
  max.poll.interval.ms = 180
  max.poll.records = 1000
  metadata.max.age.ms = 30
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 3
  partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
  receive.buffer.bytes = 65536
  reconnect.backoff.ms = 50
  request.timeout.ms = 1801000
  retry.backoff.ms = 100
  sasl.jaas.config = null
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 6
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.mechanism = GSSAPI
  security.protocol = PLAINTEXT
  send.buffer.bytes = 131072
  session.timeout.ms = 1
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  ssl.endpoint.identification.algorithm = null
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLS
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-15 Thread Mathieu Fenniak
On Wed, Feb 15, 2017 at 5:04 PM, Matthias J. Sax 
wrote:

> - We also removed method #topologyBuilder() from KStreamBuilder because
> we think #transform() should provide all functionality you need to
> mix-an-match Processor API and DSL. If there is any further concern
> about this, please let us know.
>

Hi Matthias,

Yes, I'm sorry I didn't respond sooner, but I still have a lot of concerns
about this.  You're correct to point out that transform() can be used for
some of the output situations I pointed out; albeit it seems somewhat
awkward to do so in a "transform" method; what do you do with the retval?

The PAPI processors I use in my KStreams app are all functioning on KTable
internals.  I wouldn't be able to convert them to process()/transform().

What's the harm in permitting both APIs to be used in the same application?

Mathieu


Re: [VOTE] 0.10.2.0 RC2

2017-02-15 Thread Mathieu Fenniak
+1 (non-binding)

Still looks as good as RC0 did for my streams workload. :-)

Mathieu


On Wed, Feb 15, 2017 at 1:23 PM, Magnus Edenhill  wrote:

> Verified with librdkafka v0.9.4-RC1.
>
> 2017-02-15 9:18 GMT-08:00 Tom Crayford :
>
> > Heroku tested this with our usual round of performance benchmarks, and
> > there seem to be no notable regressions in this RC that we can see (for a
> > sample on earlier regressions we found using these benchmarks during the
> > 0.10.0.0 release,
> > https://engineering.heroku.com/blogs/2016-05-27-apache-
> > kafka-010-evaluating-performance-in-distributed-systems/
> > is a decent writeup)
> >
> > On Tue, Feb 14, 2017 at 6:39 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the third candidate for release of Apache Kafka 0.10.2.0.
> > >
> > > This is a minor version release of Apache Kafka. It includes 19 new
> KIPs.
> > > See the release notes and release plan (https://cwiki.apache.org/conf
> > > luence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
> > > feature
> > > highlights: SASL-SCRAM support, improved client compatibility to allow
> > use
> > > of clients newer than the broker, session windows and global tables in
> > the
> > > Kafka Streams API, single message transforms in the Kafka Connect
> > > framework.
> > >
> > > Important note: in addition to the artifacts generated using JDK7 for
> > Scala
> > > 2.10 and 2.11, this release also includes experimental artifacts built
> > > using JDK8 for Scala 2.12.
> > >
> > > Important code changes since RC1 (non-docs, non system tests):
> > >
> > > KAFKA-4756; The auto-generated broker id should be passed to
> > > MetricReporter.configure
> > > KAFKA-4761; Fix producer regression handling small or zero batch size
> > >
> > > Release notes for the 0.10.2.0 release:
> > > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by February 17th 5pm ***
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/
> > >
> > > * Javadoc:
> > > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/javadoc/
> > >
> > > * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> > > 5712b489038b71ed8d5a679856d1dfaa925eadc1
> > >
> > >
> > > * Documentation:
> > > http://kafka.apache.org/0102/documentation.html
> > >
> > > * Protocol:
> > > http://kafka.apache.org/0102/protocol.html
> > >
> > > * Successful Jenkins builds for the 0.10.2 branch:
> > > Unit/integration tests: https://builds.apache.org/job/
> > > kafka-0.10.2-jdk7/77/
> > > System tests: https://jenkins.confluent.io/j
> ob/system-test-kafka-0.10.2/
> > > 29/
> > >
> > > /**
> > >
> > > Thanks,
> > > Ewen
> > >
> >
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Mathieu Fenniak
On Tue, Feb 14, 2017 at 9:37 AM, Damian Guy  wrote:

> > And about printing the topology for debuggability: I agrees this is a
> > > potential drawback, and I'd suggest maintain some functionality to
> build
> > a
> > > "dry topology" as Mathieu suggested; the difficulty is that, internally
> > we
> > > need a different "copy" of the topology for each thread so that they
> will
> > > not share any states, so we cannot directly pass in the topology into
> > > KafkaStreams instead of the topology builder. So how about adding a
> > > `ToplogyBuilder#toString` function which calls `build()` internally
> then
> > > prints the built dry topology?
> > >
> >
> > Well, this sounds better than KafkaStreams#toString() in that it doesn't
> > require a running processor.  But I'd really love to have a simple object
> > model for the topology, not a string output, so that I can output my own
> > debug format.  I currently have that in the form of
> > TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).
> >
>
> How about a method on TopologyBuilder that you pass a functional interface
> to and it gets called once for each ProcessorTopology? We may need to add a
> new public class that represents the topology (i'm not sure we want to
> 'expose` ProcessorTopology as public).
>

That sounds awesome.

Mathieu


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Mathieu Fenniak
On Tue, Feb 14, 2017 at 1:14 AM, Guozhang Wang  wrote:

> Some thoughts on the mixture usage of DSL / PAPI:
>
> There were some suggestions on mixing the usage of DSL and PAPI:
> https://issues.apache.org/jira/browse/KAFKA-3455, and after thinking it a
> bit more carefully, I'd rather not recommend users following this pattern,
> since in DSL this can always be achieved in process() / transform(). Hence
> I think it is okay to prevent such patterns in the new APIs. And for the
> same reasons, I think we can remove KStreamBuilder#newName() from the
> public APIs.
>

I'm not sure that things can always be achieved by process() /
transform()... there are some limitations to these APIs.  You can't output
from a process(), and you can't output multiple records or branching logic
from a transform(); these are things that can be done in the PAPI quite
easily.

I definitely understand a preference for using process()/transform() where
possible, but, they don't seem to replace the PAPI.

I would love to be operating in a world that was entirely DSL.  But the DSL
is limited, and it isn't extensible (... by any stable API).  I don't mind
reaching into internals today and making my own life difficult to extend
it, and I'd continue to find a way to do that if you made the APIs distinct
and split, but I'm just expressing my preference that you not do that. :-)

And about printing the topology for debuggability: I agrees this is a
> potential drawback, and I'd suggest maintain some functionality to build a
> "dry topology" as Mathieu suggested; the difficulty is that, internally we
> need a different "copy" of the topology for each thread so that they will
> not share any states, so we cannot directly pass in the topology into
> KafkaStreams instead of the topology builder. So how about adding a
> `ToplogyBuilder#toString` function which calls `build()` internally then
> prints the built dry topology?
>

Well, this sounds better than KafkaStreams#toString() in that it doesn't
require a running processor.  But I'd really love to have a simple object
model for the topology, not a string output, so that I can output my own
debug format.  I currently have that in the form of
TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).

Mathieu


Re: Kafka streams: Getting a state store associated to a processor

2017-02-13 Thread Mathieu Fenniak
Hi Adam,

If you increase the number of partitions in the topic "topic1" after the
state store is created, you'd need to manually increase the number of
partitions in the "app1-store1-changelog" topic as well.  Or remove the
topic and let KS recreate it next run.  But, either way, hopefully you
don't need the data in it, 'cause it won't match the partitioning of the
input topic. :-)

Mathieu


On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski  wrote:

> Hello,
>
> I have a simple example (or so it would seem) of a stream processor which
> uses a persistent state store. Testing on one local Kafka (0.10.1.1) node,
> this starts up without problems for a topic with 1 partition. However, if I
> create a topic with 3 partitions I’m getting the following exception
> shortly after the init() method of the Processor is called (init completes
> without problems):
>
> 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
> [StreamThread-1] Failed to create an active task 0_1:
> org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
> store1's change log (app1-store1-changelog) does not contain partition 1
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.register(ProcessorStateManager.java:185)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at org.apache.kafka.streams.state.internals.RocksDBStore.
> init(RocksDBStore.java:169)
> at org.apache.kafka.streams.state.internals.
> MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:81)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:119)
>
> The code is essentially:
>
> StateStoreSupplier testStore = Stores.create("store1")
> .withStringKeys()
> .withStringValues()
> .persistent()
> .build();
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.addSource("source", "topic1")
> .addProcessor("process", TestProcessor::new, "source")
> .addStateStore(testStore, "process”);
>
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
>
> public static class TestProcessor implements Processor {
> @Override
> public void init(ProcessorContext context) {
> context.getStateStore("store1");
> System.out.println("Initialized");
> }
> }
>
> Full source here: https://gist.github.com/adamw/
> b5c69f86d8688da23afebd095683faaa 
> Full stack trace: https://gist.github.com/adamw/
> f72cdf0c2f0d67425ed9c103a327f3bf  f72cdf0c2f0d67425ed9c103a327f3bf>
>
> I would be grateful for any pointers!
> Adam
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski 
> http://www.softwaremill.com 
> http://www.warski.org 
>


Re: Partitioning behavior of Kafka Streams without explicit StreamPartitioner

2017-02-10 Thread Mathieu Fenniak
Well, I think what you're doing is unusual for sure.  The Streams API is
really about transforming streams of data from input to output... so
therefore the API doesn't have an injection point like you're looking for.
I'd say it's intentional (I'm just a user though).

If I were in your shoes, I'd probably decouple the applications -- make the
portion that accepts messages over HTTP and produces to a Kafka topic one
application, and make the stream processor another app.  That would allow
them to be deployed and scaled separately (eg. they may not always require
the same hardware, capacity, yada yada).

Mathieu


On Fri, Feb 10, 2017 at 2:22 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

>
> > On Feb 10, 2017, at 1:09 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> >
> > Hey Steven,
> >
> > If you have one KStream, and you want to produce to a topic that is read
> by
> > another KStream, you'd use the ".through" method of the first KStream.
> > ".through" both outputs to a topic and returns a KStream that reads from
> > that topic.  (".to" just outputs to a topic)
> >
> > If you want to produce data from a different application into a Kafka
> > Streams app, you'd be using the Kafka Producer without the Kafka Streams
> > library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
> > could use any implementation/configuration of partitioner.class that
> you'd
> > want to.
> >
>
> I am writing essentially a distributed state machine using Kafka.  I get
> the sense
> that most users / examples are using it more for ETL or streaming data
> processing,
> which is probably why my need here seems a little strange.
>
> I expect to accept messages over HTTP and then wish to offer it into the
> processing
> stream.  The only actor in my model currently *is* the Kafka Streams app,
> there is no
> upstream or downstream collaborator to feed it data over Kafka (yet).
>
> I get that I can use the normal Producer with the partitioner below, but I
> consider
> the code a little ugly and probably could be improved.  Is the lack of a
> Kafka Streams
> level produce intentional?  Am I thinking about the problem wrong?
>
> > Mathieu
> >
> >
> > On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
> > sschlans...@opentable.com> wrote:
> >
> >> So then I guess my problem really is that I am operating at two
> different
> >> levels of abstraction.
> >> How do I produce to a KStream?  I could imagine a method:
> >>
> >> public void KStream.put(K, V, Callback?);
> >>
> >> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
> >> really seem like what I want either.
> >>
> >> Currently I do this, which I feel isn't the most elegant solution:
> >>
> >> public class MyPartitioner implements Partitioner,
> >> StreamPartitioner {
> >>@Override
> >>public int partition(String topic, Object key, byte[] keyBytes,
> Object
> >> value, byte[] valueBytes, Cluster cluster) {
> >>final List partitions =
> cluster.partitionsForTopic(
> >> topic);
> >>return partition0(keyBytes, value, partitions.size());
> >>}
> >>
> >>@Override
> >>public Integer partition(String key, ChatMessage value, int
> >> numPartitions) {
> >>return partition0(null, value, numPartitions);
> >>}
> >>
> >>@VisibleForTesting
> >>int partition0(byte[] keyBytes, Object value, final int
> numPartitions)
> >> {
> >>if (value instanceof ChatMessage) {
> >>return messagePartition((ChatMessage) value, numPartitions);
> >>}
> >>// same as DefaultPartitioner except we assume no null keys
> >>return Utils.toPositive(Utils.murmur2(keyBytes)) %
> numPartitions;
> >>}
> >> }
> >>
> >> If I could produce to a StreamPartition'd KStream, then I could work
> only
> >> at one layer.
> >> That would have the additional benefit that I would no longer need to
> >> configure and
> >> own my own KafkaProducers.
> >>
> >>> On Feb 9, 2017, at 8:25 PM, Matthias J. Sax 
> >> wrote:
> >>>
> >>> It's by design.
> >>>
> >>> The reason it, that Streams uses a single producer to write to
> different
> >>> output topic. As different output

Re: Partitioning behavior of Kafka Streams without explicit StreamPartitioner

2017-02-10 Thread Mathieu Fenniak
Hey Steven,

If you have one KStream, and you want to produce to a topic that is read by
another KStream, you'd use the ".through" method of the first KStream.
 ".through" both outputs to a topic and returns a KStream that reads from
that topic.  (".to" just outputs to a topic)

If you want to produce data from a different application into a Kafka
Streams app, you'd be using the Kafka Producer without the Kafka Streams
library.  (Or Kafka Connect, or any other way to produce to Kafka).  You
could use any implementation/configuration of partitioner.class that you'd
want to.

Mathieu


On Fri, Feb 10, 2017 at 1:50 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> So then I guess my problem really is that I am operating at two different
> levels of abstraction.
> How do I produce to a KStream?  I could imagine a method:
>
> public void KStream.put(K, V, Callback?);
>
> but I don't see anything like that.  Nor do the "QueryableStoreTypes"
> really seem like what I want either.
>
> Currently I do this, which I feel isn't the most elegant solution:
>
> public class MyPartitioner implements Partitioner,
> StreamPartitioner {
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
> final List partitions = cluster.partitionsForTopic(
> topic);
> return partition0(keyBytes, value, partitions.size());
> }
>
> @Override
> public Integer partition(String key, ChatMessage value, int
> numPartitions) {
> return partition0(null, value, numPartitions);
> }
>
> @VisibleForTesting
> int partition0(byte[] keyBytes, Object value, final int numPartitions)
> {
> if (value instanceof ChatMessage) {
> return messagePartition((ChatMessage) value, numPartitions);
> }
> // same as DefaultPartitioner except we assume no null keys
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> }
> }
>
> If I could produce to a StreamPartition'd KStream, then I could work only
> at one layer.
> That would have the additional benefit that I would no longer need to
> configure and
> own my own KafkaProducers.
>
> > On Feb 9, 2017, at 8:25 PM, Matthias J. Sax 
> wrote:
> >
> > It's by design.
> >
> > The reason it, that Streams uses a single producer to write to different
> > output topic. As different output topics might have different key and/or
> > value types, the producer is instantiated with byte[] as key and value
> > type, and Streams serialized the data before handing it to the producer
> > -- Streams knows the topology and can pick the right serializer
> > according to the current key and value type.
> >
> > That's the reason why KStream#to() has an overload allowing to specify a
> > custom StreamPartitioner that will be called by Streams (not the
> > producer) to compute the partition before serializing the data. For this
> > case, the partition (to write the data into) is given to the producer
> > directly and the producer does not call it's own partitioner.
> >
> >
> > -Matthias
> >
> >
> > On 2/9/17 3:49 PM, Steven Schlansker wrote:
> >> Hi, I discovered what I consider to be really confusing behavior --
> wondering if this is by design or a bug.
> >>
> >> The Kafka Partitioner interface:
> >> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster);
> >>
> >> has both "Object value" and "byte[] valueBytes" provided.  I naïvely
> assumed that "value" would be the pre-serialized
> >> domain object.
> >>
> >> I set up a KStream:
> >>
> >>
> >> builder.stream(Serdes.String(), requestSerde, requestTopic)
> >>   .mapValues(this::requestToMessage)
> >>   .to(Serdes.String(), messageSerde, messageTopic);
> >>
> >>
> >> I produce to the "messageTopic" both via this Stream as well as by a
> normal KafkaProducer.
> >>
> >> I thought this should be sufficient to partition both ways:
> >>
> >> props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
> MessagePartitioner.class);
> >>
> >> The partitioner has special understanding of the Message type and
> behaves as a DefaultPartitioner otherwise.
> >> Roughly,
> >>
> >>
> >> int partition(...) {
> >>return value instanceof Message ? specialPartition((Message)value)
> : defaultPartition(keyBytes);
> >> }
> >>
> >>
> >> This works great for KafkaProducer.  The "value" field is indeed my
> Message type and partitions are assigned
> >> correctly.
> >> Unfortunately it does *not* work with the stream producer, which causes
> very confusing behavior.  It turns out
> >> that the RecordCollectorImpl does its own serialization:
> >>
> >>
> >> byte[] keyBytes = keySerializer.serialize(topic, key);
> >> byte[] valBytes = valueSerializer.serialize(topic, value);
> >> if (partition == null && partitioner != null) {
> >>List partitions = this.producer.partitionsFor(topic);
> >>if (partitions != null && partitions.size() > 0)
> >>  

Re: Need help in understanding bunch of rocksdb errors on kafka_2.10-0.10.1.1

2017-02-10 Thread Mathieu Fenniak
Hi Sachin,

Streams apps can be configured with a rocksdb.config.setter, which is a
class name that needs to implement
the org.apache.kafka.streams.state.RocksDBConfigSetter interface, which can
be used to reduce the memory utilization of RockDB.  Here's an example
class that trims it way down (note that this is Kotlin code, not Java):

package com.replicon.streamProcessing.timesheetList

import org.rocksdb.Options
import org.rocksdb.BlockBasedTableConfig

class RocksDBConfigSetter :
org.apache.kafka.streams.state.RocksDBConfigSetter {
// Default KS value is 100 megs block cache, 32 megs write buffer, but
that's awfully expensive on memory.  This
// trims it down to 2 megs per store.

private val WRITE_BUFFER_SIZE = 1 * 1024 * 1024L
private val BLOCK_CACHE_SIZE = 1 * 1024 * 1024L
private val BLOCK_SIZE = 4096L

override fun setConfig(storeName: String, options: Options, configs:
MutableMap) {
val tableConfig = BlockBasedTableConfig()
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE)
tableConfig.setBlockSize(BLOCK_SIZE)

options.setTableFormatConfig(tableConfig)
options.setWriteBufferSize(WRITE_BUFFER_SIZE)
}
}


Which I then reference in my settings before starting my KafkaStreams
(still Kotlin):

settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
RocksDBConfigSetter::class.java)




On Fri, Feb 10, 2017 at 12:13 AM, Sachin Mittal  wrote:

> Hi,
> We are running rocksdb with default configuration.
> I would try to monitor the rocks db, I do see the beans when I connect via
> jmx client.
>
> We use rocks db for aggregation. Our pipe line is:
>
> input
> .groupByKey()
> .aggregate(new Initializer>() {
> public SortedSet apply() {
> return new TreeSet();
> }
> }, new Aggregator>() {
> public SortedSet apply(String key, T value, SortedSet aggregate)
> {
> aggregate.add(value);
> return aggregate;
> }
> }, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table")
> .mapValues(new ValueMapper, SortedSet>() {
> public SortedSet apply(SortedSet t) {
> SortedSet u = new TreeSet();
> ...
> return u;
> }
> })
> .foreach(new ForeachAction, SortedSet>() {
> public void apply(Windowed key, SortedSet nodes) {
> ...
> }
> });
>
> So I guess rocksdb is used only in case of aggregate part.
>
> Is there some additional rocksdb setting that you would like us to try.
>
> Thanks
> Sachin
>
>
> On Fri, Feb 10, 2017 at 12:33 PM, Guozhang Wang 
> wrote:
>
> > Sachin,
> >
> > Thanks for sharing your observations, that are very helpful.
> >
> > Regards to monitoring, there are indeed metrics inside the Streams
> library
> > to meter state store operations; for rocksdb it records the average
> latency
> > and callrate for put / get / delete / all / range and flush / restore.
> You
> > can find the corresponding metric names in the monitoring section on web
> > docs once 0.10.2 is out:
> >
> > https://kafka.apache.org/documentation/#monitoring
> >
> > Or you can just grep all metric names and see which ones are of interests
> > to you.
> >
> >
> > Regards to the observations, I'm particularly interested in the fact that
> > "memory utilization keeps increasing". But since rocksdb uses both
> on-heap
> > and off heap memory it is hard to tell if there is really a memory leak.
> > Could you share your usage of the state stores, like are they used for
> > aggregations / joins / else, and did you override any of the rocksdb
> config
> > settings?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Feb 9, 2017 at 8:58 PM, Sachin Mittal 
> wrote:
> >
> > > Hi,
> > > We recently upgraded to 0.10.2.0-rc0, the rocksdb issue seems to be
> > > resolved however we are just not able to get the streams going under
> our
> > > current scenario.
> > > The issue seems to be still related to rocksdb.
> > > Let me explain the situation:
> > >
> > > We have 40 partitions and 12 threads distributes across three machines
> > > (each having 4).
> > > Roughly 15 partition gets assigned to each machine. Thus we have 15
> state
> > > stores on each machine.
> > >
> > > What we see is that streams frequently gets rebalanced due to
> > > CommitFailedException and as you have said this should not happen once
> > > application reaches steady state.
> > > We are running an instance on 4 core linux box (virtual machine). What
> we
> > > observe is that there is lot of waiting time for each core consistently
> > > greater than 50%. What we suspect is that application is spending lot
> of
> > > time on disk I/O this CPU keeps waiting. Also we observe that rate of
> > > consumption from source topic falls over time. Also overtime we see
> that
> > > CPU utilization falls and memory utilization increases.
> > >
> > > Then we we did was used 12 partition for same topic and 12 threads, so
> > each
> > > thread processes from single partition and 4 state stores get created
> per
> > > machine. (note streams

Re: KTable and cleanup.policy=compact

2017-02-08 Thread Mathieu Fenniak
I think there could be correctness implications... the default
cleanup.policy of delete would mean that topic entries past the retention
policy might have been removed.  If you scale up the application, new
application instances won't be able to restore a complete table into its
local state store.  An operation like a join against that KTable would find
no records where there should be record.

Mathieu


On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska 
wrote:

> If you fail to set the policy to compact, there shouldn't be any
> correctness implications, however your topics will grow larger than
> necessary.
>
> Eno
>
> > On 8 Feb 2017, at 18:56, Jon Yeargers  wrote:
> >
> > What are the ramifications of failing to do this?
> >
> > On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax 
> > wrote:
> >
> >> Yes, that is correct.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
> >>> Hey kafka users,
> >>>
> >>> Is it correct that a Kafka topic that is used for a KTable should be
> set
> >> to
> >>> cleanup.policy=compact?
> >>>
> >>> I've never noticed until today that the KStreamBuilder#table()
> >>> documentation says: "However, no internal changelog topic is created
> >> since
> >>> the original input topic can be used for recovery"... [1], which seems
> >> like
> >>> it is only true if the topic is configured for compaction.  Otherwise
> the
> >>> original input topic won't necessarily contain the data necessary for
> >>> recovery of the state store.
> >>>
> >>> [1]
> >>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
> >> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
> >> kstream/KStreamBuilder.java#L355
> >>>
> >>> Thanks,
> >>>
> >>> Mathieu
> >>>
> >>
> >>
>
>


KTable and cleanup.policy=compact

2017-02-07 Thread Mathieu Fenniak
Hey kafka users,

Is it correct that a Kafka topic that is used for a KTable should be set to
cleanup.policy=compact?

I've never noticed until today that the KStreamBuilder#table()
documentation says: "However, no internal changelog topic is created since
the original input topic can be used for recovery"... [1], which seems like
it is only true if the topic is configured for compaction.  Otherwise the
original input topic won't necessarily contain the data necessary for
recovery of the state store.

[1]
https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf079517523c83060/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java#L355

Thanks,

Mathieu


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-07 Thread Mathieu Fenniak
On Mon, Feb 6, 2017 at 2:35 PM, Matthias J. Sax 
wrote:

> - adding KStreamBuilder#topologyBuilder() seems like be a good idea to
> address any concern with limited access to TopologyBuilder and DSL/PAPI
> mix-and-match approach. However, we should try to cover as much as
> possible with #process(), #transform() etc.
>

That sounds like it'll work for me.


> - about TopologyBuilder.nodeGroups & TopologyBuilder.build: not sure
> what analysis you do -- there is also KafkaStreams#toString() that
> describes the topology/DAG of the job. @Mathieu: Could you use this for
> your analysis?
>

Well, I'd like to be able to output a graphviz diagram of my processor
topology.  I am aware of KafkaStreams#toString(), but, it isn't the format
I want, if I remember correctly I found it was ambiguous to parse &
transform, and it also has the limitation of requiring a running and
processing application as toString() doesn't return anything useful until
the consumer stream threads are running.

What I've whipped up with the existing ProcessorTopology API (
https://gist.github.com/mfenniak/04f9c0bea8a1a2e0a747d678117df9f7) just
builds a "dry" topology (ie. no data being processed) and outputs a graph.
It's hooked into my app so that I can run with a specific command-line
option to output the graph without having to start the processor.

It's not the worst thing in the world to lose, or to have to jump through
some reflection hoops to do. :-)  Perhaps a better approach would be to
have an API designed specifically for this kind of introspection,
independent of the much more commonly used API to build a topology.

Mathieu


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-06 Thread Mathieu Fenniak
Hi Matthias,

I use a few of the methods that you're pointing out that will be deprecated
and don't have an apparent alternative, so I wanted to just let you know
what they are and what my use-cases are for them.

First of all, I use a combination of DSL and PAPI in the same application
very happily.  It looks like this API would not permit that at all... it
looks like the intent here is to block that kind of usage, but as the DSL
is not extensible (short of building on "internal" API classes and using
reflection), this creates some real limitations in the type of applications
that can be built.

TopologyBuilder.addInternalTopic -- as we've discussed before, I have some
reusable join components that build multiple processors w/ internal
repartition topics.  We use addInternalTopic to create application-id
prefixed repartition topics.  This could be worked around fairly easily.

TopologyBuilder.nodeGroups & TopologyBuilder.build -- I have an option in
my KS app to run once & output a graphviz document with my entire topology
for debugging and analysis purposes; I use these methods to
create ProcessorTopology instances to inspect the topology and create this
output.  I don't really see any alternative approach with this new API.

KStreamBuilder.newName -- Similar to addInternalTopic, I use this to create
processor names in reusable components.  Lacking this method would be
fairly easy to work around.

Mathieu


On Fri, Feb 3, 2017 at 4:33 PM, Matthias J. Sax 
wrote:

> Hi All,
>
> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
>
> Please have a look here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 120%3A+Cleanup+Kafka+Streams+builder+API
>
> Looking forward to your feedback!
>
>
> -Matthias
>
>


Re: Streams InvalidStateStoreException: Store ... is currently closed

2017-02-02 Thread Mathieu Fenniak
Thanks for the quick response Damian.  I'll update my processors and
retest. 👍

On Thu, Feb 2, 2017 at 9:27 AM, Damian Guy  wrote:

> Hi Matthew,
> You shouldn't close the stores in your custom processors. They are closed
> automatically by the framework during rebalances and shutdown.
> There is a good chance that your closing of the stores is causing the
> issue. Of course if you see the exception again then please report back so
> we can investigate further.
>
> Thanks,
> Damian
>
> On Thu, 2 Feb 2017 at 16:12 Mathieu Fenniak 
> wrote:
>
> > Hey all,
> >
> > When an instance of a streams Processor is closed, is it supposed to call
> > close() on any state stores that it retrieved from the ProcessorContext
> in
> > its own close()?
> >
> > I started following the pattern of having every Processor close every
> state
> > store based upon this documentation's example (
> > http://docs.confluent.io/3.1.1/streams/developer-guide.
> html#processor-api
> > ),
> > but, I see that at least some processors in Kafka Streams don't close
> their
> > state stores (eg.
> >
> > https://github.com/apache/kafka/blob/a95170f87c50414c57860e8547
> dc2e9d84cb/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KTableSource.java#L46
> > ).
> >
> > I've just pulled down Kafka Streams 0.10.2.0 RC0 to give it a test with
> my
> > streams application, and I'm getting an error after the app is running
> for
> > a while: org.apache.kafka.streams.errors.InvalidStateStoreException:
> Store
> > projectTimeAllocation-projectBillingRateHistory-historical-lookup-store
> is
> > currently closed.  (full log:
> > https://gist.github.com/mfenniak/cd108ad655ca63252be550c7b96414c5)  I
> > think
> > that this is probably caused by having multiple custom processors
> attached
> > to one state store and both of them closing it, so that's why I'm trying
> to
> > determine whether this is the right behavior for me to do, or whether
> this
> > exception might be a bug in Kafka Streams?
> >
> > Mathieu
> >
>


Streams InvalidStateStoreException: Store ... is currently closed

2017-02-02 Thread Mathieu Fenniak
Hey all,

When an instance of a streams Processor is closed, is it supposed to call
close() on any state stores that it retrieved from the ProcessorContext in
its own close()?

I started following the pattern of having every Processor close every state
store based upon this documentation's example (
http://docs.confluent.io/3.1.1/streams/developer-guide.html#processor-api),
but, I see that at least some processors in Kafka Streams don't close their
state stores (eg.
https://github.com/apache/kafka/blob/a95170f87c50414c57860e8547dc2e9d84cb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L46
).

I've just pulled down Kafka Streams 0.10.2.0 RC0 to give it a test with my
streams application, and I'm getting an error after the app is running for
a while: org.apache.kafka.streams.errors.InvalidStateStoreException: Store
projectTimeAllocation-projectBillingRateHistory-historical-lookup-store is
currently closed.  (full log:
https://gist.github.com/mfenniak/cd108ad655ca63252be550c7b96414c5)  I think
that this is probably caused by having multiple custom processors attached
to one state store and both of them closing it, so that's why I'm trying to
determine whether this is the right behavior for me to do, or whether this
exception might be a bug in Kafka Streams?

Mathieu


Re: Kafka Connect requestTaskReconfiguration

2017-01-15 Thread Mathieu Fenniak
OK, thanks Ewen.


On Sun, Jan 15, 2017 at 4:42 PM, Ewen Cheslack-Postava 
wrote:

> This is currently expected. Internally the Connect cluster uses the same
> rebalancing process as consumer groups which means it has similar
> limitations -- all tasks must stop just as you would need to stop consuming
> from all partitions and commit offsets during a consumer group rebalance.
>
> There's an issue filed about this: https://issues.apache.org/
> jira/browse/KAFKA-3351 and it's something we're aware eventually becomes a
> scalability limitations. There are some ideas about how to avoid this, but
> nothing concrete on the roadmap yet.
>
> -Ewen
>
> On Fri, Jan 13, 2017 at 10:32 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hey kafka-users,
> >
> > Is it normal for a Kafka Connect source connector that
> > calls requestTaskReconfiguration to cause all the connectors on the
> > kafka-connect distributed system to be stopped and started?
> >
> > One of my three connectors (2x source, 1x sink) runs a background thread
> > that will occasionally invoke `context.requestTaskReconfiguration()`
> when
> > it detects new work that it wants to distribute to its tasks.  When this
> > occurs, I observe all three of the connectors stop and start.  One of the
> > other connectors doesn't have the smoothest stop/start cycle, so I'm
> hoping
> > that this might be avoidable?
> >
> > I'm running on Kafka Connect 0.10.1.0.
> >
> > Mathieu
> >
>


Kafka Connect requestTaskReconfiguration

2017-01-13 Thread Mathieu Fenniak
Hey kafka-users,

Is it normal for a Kafka Connect source connector that
calls requestTaskReconfiguration to cause all the connectors on the
kafka-connect distributed system to be stopped and started?

One of my three connectors (2x source, 1x sink) runs a background thread
that will occasionally invoke `context.requestTaskReconfiguration()` when
it detects new work that it wants to distribute to its tasks.  When this
occurs, I observe all three of the connectors stop and start.  One of the
other connectors doesn't have the smoothest stop/start cycle, so I'm hoping
that this might be avoidable?

I'm running on Kafka Connect 0.10.1.0.

Mathieu


Connect: SourceTask poll & commit interaction

2016-12-10 Thread Mathieu Fenniak
Hi Kafka Users,

I'm looking for a bit of clarification on the documentation for
implementing a SourceTask.  I'm reading a replication stream from a
database in my SourceTask, and I'd like to use commit or commitRecord to
advance the other system's replication stream pointer so that it knows I
have successfully read & committed the records to Kafka.  This allows the
other system to discard unneeded transaction logs.

But I'm uncertain how to use either or SourceTask's commit or commitRecord
correctly.

For commit, the documentation says that it should "Commit the offsets, up
to the offsets that have been returned by poll().".  When commit() is
executed, will poll() currently be running on another thread?  I assume it
must be, because poll should block, and that would imply you can't commit
the tailing end of some activity.  If commit is invoked while poll is being
invoked, I'm concerned that I can't reliably determine where to advance my
replication stream pointer to -- if I store the location at the end of
poll, commit might be invoked while poll is still returning some records,
and advance the pointer further than actually guaranteed.

commitRecord on the other hand is invoked per-record.  The documentation
says "Commit an individual SourceRecord when the callback from the producer
client is received."  But if I'm producing to N partitions on different
brokers, I believe that the producer callback is not called in any
guaranteed order, so I can't advance my replication stream pointer to any
single record since an older record being delivered to another partition
may not have been committed.

The only solution I can see so far is to maintain the replication stream
positions of all the source records that I've returned from poll, and
advance the replication pointer in commitRecord only when the lowest
outstanding record is committed.

Is there anything I've misunderstood or misinterpreted?

Thanks,

Mathieu


Re: Running cluster of stream processing application

2016-12-08 Thread Mathieu Fenniak
Hi Sachin,

Some quick answers, and a link to some documentation to read more:

- If you restart the application, it will start from the point it crashed
(possibly reprocessing a small window of records).

- You can run more than one instance of the application.  They'll
coordinate by virtue of being part of a Kafka consumer group; if one
crashes, the partitions that it was reading from will be picked up by other
instances.

- When running more than one instance, the tasks will be distributed
between the instances.

Confluent's docs on the Kafka Streams architecture goes into a lot more
detail: http://docs.confluent.io/3.0.0/streams/architecture.html




On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal  wrote:

> Hi All,
> We were able to run a stream processing application against a fairly decent
> load of messages in production environment.
>
> To make the system robust say the stream processing application crashes, is
> there a way to make it auto start from the point when it crashed?
>
> Also is there any concept like running the same application in a cluster,
> where one fails, other takes over, until we bring back up the failed node
> of streams application.
>
> If yes, is there any guidelines or some knowledge base we can look at to
> understand how this would work.
>
> Is there way like in spark, where the driver program distributes the tasks
> across various nodes in a cluster, is there something similar in kafka
> streaming too.
>
> Thanks
> Sachin
>


Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key

2016-12-05 Thread Mathieu Fenniak
Hi Damian,

Yes... I can see how most of the stack trace is rather meaningless.
Unfortunately I don't have a minimal test case, and I don't want to burden
you by dumping the entire application.  (I could share it privately, if
you'd like.)

Based upon the stack trace, the relevant pieces involved are a multi-table
join (KTable [leftJoin KTable]*20) that accumulates pieces of data into a
map; one of the joined tables is an aggregation (KTable -> filter ->
groupBy -> aggregate "TimesheetNonBillableHours") that would have the
affected cache.

Mathieu


On Mon, Dec 5, 2016 at 8:36 AM, Damian Guy  wrote:

> Hi Mathieu,
>
> I'm trying to make sense of the rather long stack trace in the gist you
> provided. Can you possibly share your streams topology with us?
>
> Thanks,
> Damian
>
> On Mon, 5 Dec 2016 at 14:14 Mathieu Fenniak 
> wrote:
>
> > Hi Eno,
> >
> > This exception occurred w/ trunk @ e43bbce (current as-of Saturday).  I
> was
> > bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so with
> > that issue now resolved I thought I'd check trunk out to see if any other
> > issues remain.
> >
> > Mathieu
> >
> >
> > On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereska 
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > What version of Kafka are you using? There was recently a fix that went
> > > into trunk, just checking if you're using an older version.
> > > (to make forward progress you can turn the cache off, like this:
> > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> > > 0);
> > > )
> > >
> > > Thanks
> > > Eno
> > > > On 4 Dec 2016, at 03:47, Mathieu Fenniak <
> mathieu.fenn...@replicon.com
> > >
> > > wrote:
> > > >
> > > > Hey all,
> > > >
> > > > I've just been running a quick test of my kafka-streams application
> on
> > > the
> > > > latest Kafka trunk (@e43bbce), and came across this error.  I was
> > > wondering
> > > > if anyone has seen this error before, have any thoughts on what might
> > > cause
> > > > it, or can suggest a direction to investigate it further.
> > > >
> > > > Full exception:
> > > > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c
> > > >
> > > > java.lang.IllegalStateException: Attempting to put a clean entry for
> > key
> > > > [urn:replicon-tenant:strprc971e3ca9:timesheet:
> 97c0ce25-e039-4e8b-9f2c-
> > > d43f0668b755]
> > > > into NamedCache [0_0-TimesheetNonBillableHours] when it already
> > > contains a
> > > > dirty entry for the same key
> > > > at
> > > > org.apache.kafka.streams.state.internals.NamedCache.
> > > put(NamedCache.java:124)
> > > > at
> > > > org.apache.kafka.streams.state.internals.ThreadCache.
> > > put(ThreadCache.java:120)
> > > > at
> > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > > CachingKeyValueStore.java:146)
> > > > at
> > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > > CachingKeyValueStore.java:133)
> > > > at
> > > > org.apache.kafka.streams.kstream.internals.KTableAggregate$
> > > KTableAggregateValueGetter.get(KTableAggregate.java:128)
> > > > at
> > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81)
> > > > at
> > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54)
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:82)
> > > > ... more ...
> > >
> > >
> >
>


Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key

2016-12-05 Thread Mathieu Fenniak
Hi Eno,

This exception occurred w/ trunk @ e43bbce (current as-of Saturday).  I was
bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so with
that issue now resolved I thought I'd check trunk out to see if any other
issues remain.

Mathieu


On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereska 
wrote:

> Hi Mathieu,
>
> What version of Kafka are you using? There was recently a fix that went
> into trunk, just checking if you're using an older version.
> (to make forward progress you can turn the cache off, like this:
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
> )
>
> Thanks
> Eno
> > On 4 Dec 2016, at 03:47, Mathieu Fenniak 
> wrote:
> >
> > Hey all,
> >
> > I've just been running a quick test of my kafka-streams application on
> the
> > latest Kafka trunk (@e43bbce), and came across this error.  I was
> wondering
> > if anyone has seen this error before, have any thoughts on what might
> cause
> > it, or can suggest a direction to investigate it further.
> >
> > Full exception:
> > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c
> >
> > java.lang.IllegalStateException: Attempting to put a clean entry for key
> > [urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c-
> d43f0668b755]
> > into NamedCache [0_0-TimesheetNonBillableHours] when it already
> contains a
> > dirty entry for the same key
> > at
> > org.apache.kafka.streams.state.internals.NamedCache.
> put(NamedCache.java:124)
> > at
> > org.apache.kafka.streams.state.internals.ThreadCache.
> put(ThreadCache.java:120)
> > at
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:146)
> > at
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:133)
> > at
> > org.apache.kafka.streams.kstream.internals.KTableAggregate$
> KTableAggregateValueGetter.get(KTableAggregate.java:128)
> > at
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81)
> > at
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > ... more ...
>
>


Re: lag monitoring

2016-12-05 Thread Mathieu Fenniak
Hi Jon,

Here are some lag monitoring options that are external to the consumer
application itself; I don't know if these will be appropriate for you.  You
can use a command-line tool like kafka-consumer-groups.sh to monitor
consumer group lag externally (
http://kafka.apache.org/documentation.html#basic_ops_consumer_group), you
can use Kafka JMX metrics to monitor lag (
http://kafka.apache.org/documentation.html#monitoring), or you can use an
external tool like Burrow (https://github.com/linkedin/Burrow).

Mathieu



On Mon, Dec 5, 2016 at 4:47 AM, Jon Yeargers 
wrote:

> Is there a way to get updated consumer position(s) without subscribing to a
> topic? I can achieve this by continually closing / reopening a
> KafkaConsumer object but this is problematic as it often times out.
>
> Im getting consumer lag from a combination of
>
> (start) .seekToEnd() (and then) .position()
>
> and
>
> (end) .committed()
>
> but if I wait and call this set of functions again I get the same values.
>
> How do I refresh this connection without interfering with the actual
> consumer group position(s)?
>


Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key

2016-12-03 Thread Mathieu Fenniak
Hey all,

I've just been running a quick test of my kafka-streams application on the
latest Kafka trunk (@e43bbce), and came across this error.  I was wondering
if anyone has seen this error before, have any thoughts on what might cause
it, or can suggest a direction to investigate it further.

Full exception:
https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c

java.lang.IllegalStateException: Attempting to put a clean entry for key
[urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c-d43f0668b755]
into NamedCache [0_0-TimesheetNonBillableHours] when it already contains a
dirty entry for the same key
at
org.apache.kafka.streams.state.internals.NamedCache.put(NamedCache.java:124)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:146)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:133)
at
org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateValueGetter.get(KTableAggregate.java:128)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
... more ...


Re: Initializing StateStores takes *really* long for large datasets

2016-11-30 Thread Mathieu Fenniak
I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
usage.  I was also surprised by the amount of non-JVM-heap memory being
used and had to tune the 100 MB default down considerably.  It's also
unfortunate that it's hard to estimate the memory requirements for a KS app
because of this.  If you have ten stores, and assuming the default config,
you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
half a GB if you run two app instances because the stores will be
distributed.

It would be much nicer to be able to give KS a fixed amount of memory in a
config that it divided among the active stores on a node.  Configure it
with N GB; if a rebalance adds more tasks and stores, they each get less
RAM; if a rebalance removes tasks and stores, the remaining stores get more
RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
interface because it doesn't get any state about the KS topology to make
decisions; which are arguably not config, but tuning / performance
decisions.

Mathieu



On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:

> I'll write an update on where I am now.
>
> I've got about 40 'primary' topics, some small, some up to about 10M
> messages,
> and about 30 internal topics, divided over 6 stream instances, all running
> in a single
> app, talking to a 3 node Kafka cluster.
>
> I use a single thread per stream instance, as my prime concern is now to
> get it
> to run stable, rather than optimizing performance.
>
> My biggest issue was that after a few hours my application started to slow
> down
> to ultimately freeze up or crash. It turned out that RocksDb consumed all
> my
> memory, which I overlooked as it was off-heap.
>
> I was fooling around with RocksDb settings a bit but I had missed the most
> important
> one:
>
> BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> tableConfig.setBlockSize(BLOCK_SIZE);
> options.setTableFormatConfig(tableConfig);
>
> The block cache size defaults to a whopping 100Mb per store, and that gets
> expensive
> fast. I reduced it to a few megabytes. My data size is so big that I doubt
> it is very effective
> anyway. Now it seems more stable.
>
> I'd say that a smaller default makes sense, especially because the failure
> case is
> so opaque (running all tests just fine but with a serious dataset it dies
> slowly)
>
> Another thing I see is that while starting all my instances, some are quick
> and some take
> time (makes sense as the data size varies greatly), but as more instances
> start up, they
> start to use more and more CPU I/O and network, that the initialization of
> the bigger ones
> takes even longer, increasing the chance that one of them takes longer than
> the
> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
> separate the 'initialize' and 'start' step somehow.
>
> In this case we could log better: If initialization is taking longer than
> the timeout, it ends up
> being reassigned (in my case to the same instance) and then it errors out
> on being unable
> to lock the state dir. That message isn't too informative as the timeout is
> the actual problem.
>
> regards, Frank
>
>
> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:
>
> > Hello Frank,
> >
> > How many instances do you have in your apps and how many threads did you
> > use per thread? Note that besides the topology complexity (i.e. number of
> > state stores, number of internal topics etc) the (re-)initialization
> > process is depending on the underlying consumer's membership protocol,
> and
> > hence its rebalance latency could be longer with larger groups.
> >
> > We have been optimizing our rebalance latency due to state store
> migration
> > and restoration in the latest release, but for now the re-initialization
> > latency is still largely depends on 1) topology complexity regarding to
> > state stores and 2) number of input topic partitions and instance /
> threads
> > in the application.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy 
> wrote:
> >
> > > Hi Frank,
> > >
> > > If you are running on a single node then the RocksDB state should be
> > > re-used by your app. However, it relies on the app being cleanly
> shutdown
> > > and the existence of ".checkpoint" files in the state directory for the
> > > store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
> > > file
> > > doesn't exist then the entire state will be restored from the
> changelog -
> > > which could take some time. I suspect this is what is happening?
> > >
> > > As for the RocksDB memory settings, yes the off heap memory usage does
> > > sneak under the radar. There is a memory management story for Kafka
> > Streams
> > > that is yet to be started. This would involve limiting the off-heap
> > memory
> > > that RocksDB uses.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On 

Handling out-of-order messaging w/ Kafka Streams

2016-09-26 Thread Mathieu Fenniak
Hey Apache Users,

I'm working on a web application that has a web service component, and a
background processor component.  Both applications will send messages to
the same Kafka topic as an object is manipulated.

In some cases, a web service call in the service component will send a
message to Kafka saying key K has state S1, then trigger a background
operation, and then the background component will send a message to Kafka
saying key K has state S2.  However, I'm finding that the topic ends up
occasionally having a message K/S2 followed by K/S1, rather than the other
way around.  As both producers in the web service call and the background
processor send messages asynchronously with librdkafka, I believe this is a
relatively simple race condition where messages just aren't coming in like
I'd like them to.

In a consuming Kafka Streams application, I'd be creating a KTable of this
topic.  What approaches can I take to ensure the the KTable will end up
with K/S2 as the state for K, rather than the stale-er K/S1?

Would KS reorder messages if they had ordered & coordinated timestamps?  If
so, how much leeway would it have for S2 being delivered before S1?  (I
believe librdkafka 0.9.1 doesn't support sending create-time in messages,
which makes this is a bit more painful.)

Any other approaches that are worth exploring?

Thanks for any thoughts,

Mathieu


Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Mathieu Fenniak
Hey Ali,

If you have auto create turned on, which it sounds like you do, and you're
happy with using the broker's configured partition count and replication
factor, then you can call "partitionsFor(String topic)" on your producer.
This will create the topic without sending a message to it.  I'm not sure
it's 100% smart, but I found it to be better than the alternatives I could
find. :-)

Mathieu


On Wed, Sep 14, 2016 at 3:41 PM, Ali Akhtar  wrote:

> It looks like if I just send a message to a non-existent topic, it is
> created. But this has the downside that the first message of the topic is
> null or otherwise invalid.
>
> Also the partition count can't be specified.
>
> Is there a way to create a topic without needing to post a null message? Do
> I need to talk to the bash script?
>
> On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar  wrote:
>
> > Using version 0.10.0.1, how can I create kafka topics using the java
> > client / API?
> >
> > Stackoverflow answers describe using kafka.admin.AdminUtils, but this
> > class is not included in the kafka-clients maven dependency. I also don't
> > see the package kafka.admin in the javadocs: http://kafka.apache.
> > org/0100/javadoc/index.html?org/apache/kafka/clients/produce
> > r/KafkaProducer.html
> >
> >
> > What am I missing?
> >
> >
>


Re: KTable aggregations send intermediate results downstream?

2016-08-24 Thread Mathieu Fenniak
Hi Guozhang,

I've been working around this issue by dropping down to the Processor
API, but, I was hoping you might be able to point out if there is a
flaw is in this proposed change:


https://github.com/apache/kafka/compare/trunk...mfenniak:suppress-duplicate-repartition-output

This adjusts KTableRepartitionMap so that if there's no change in the
group-by key, the repartition processor just forwards the changed
value onwards.  (This breaks a couple of tests that anticipate the
exact existing output, so don't consider this a complete patch...)

Mathieu


On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang  wrote:
> Hi Mathieu,
>
> If you are only interested in the aggregate result "snapshot" but not its
> change stream (note that KTable itself is not actually a "table" as in
> RDBMS, but still a stream), you can try to use the queryable state feature
> that is available in trunk, which will be available in 0.10.1.0 release.
>
> In sum, it allows you to query any states "snapshot" which is used in
> aggregation operators in real time with state store provided APIs such as
> get-by-key, range queries on windows, etc. Details can be found in thie KIP
> (we are working on more docs / blog posts at the time):
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>
> Guozhang
>
>
> On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Hm... I hadn't thought of the repartitioning involvement.
>>
>> I'm not confident I'm understanding completely, but I believe you're
>> saying the decision to process data in this way is made before the
>> data being processed is available, because the partition *may* change,
>> because the groupBy key *may* change.
>>
>> I'm still feeling that I'm stuck getting corrupted output in the
>> middle of an aggregation.
>>
>> It's especially problematic for me if the updates to the source KTable
>> don't actually affect the results of the aggregation.  In the
>> word-count example in my original e-mail, this might be similar to
>> editing an unrelated field "author" in any article; doesn't actually
>> affect the groupBy, doesn't affect the aggregation, but still results
>> in the wrong output occurring temporarily.  (and inefficient
>> processing)
>>
>> Are there any tools in Kafka Streams that might help me prevent
>> downstream calculations if the relevant inputs haven't changed?  I was
>> thinking I'd be able to use mapValues to pluck only relevant fields
>> out of a KTable, materialize a new KTable (.through) from that, and
>> then there'd be some state from which KS would be able to only invoke
>> downstream nodes if data has changed... but it doesn't seem to work
>> like that.
>>
>> Thanks so much for your responses Guozhang, I really appreciate your
>> time to help me out.
>>
>> Mathieu
>>
>>
>> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang  wrote:
>> > The problem is that Kafka Streams need to repartition the streams based
>> on
>> > the groupBy keys when doing aggregations. For your case, the original
>> > stream may be read from a topic that is partitioned on "K", and you need
>> to
>> > first repartition on "category" on an intermediate topic before the
>> > aggregation can be executed.
>> >
>> > Hence the old and new value may be sent to two different partitions of
>> the
>> > intermediate topic, and hence be processed by two different process (it
>> > won't be the case in your application, since you mentioned the "category"
>> > will never change). Since the library cannot tell if the groupBy key will
>> > never change, it has to be conservative and do this subtract / add
>> process
>> > while receiving the old / new value.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
>> > mathieu.fenn...@replicon.com> wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for responding.  Ah, I see what you're saying... in the case of
>> >> an update to the KTable, the aggregator's subtractor result would be
>> >> necessary if the group-by key changes in the update.
>> >>
>> >> It makes sense, but unfortunately the behavior leaves me feeling a
>> >> little sketchy... when the group-by key doesn't chang

Re: Build Issue with Gradle

2016-08-23 Thread Mathieu Fenniak
Oh, that's interesting.  It looks like the Gradle Wrapper's jar file
was intentionally removed from the kafka source tree
(https://issues.apache.org/jira/browse/KAFKA-2098), which would cause
this error.  The README file in the repo says to run "gradle" first,
which will install the wrapper binary, which you can then use... but
this doesn't work when using gradle 3.0 due to the incompatibility
introduced.

The best approach is probably to install an older version of gradle
with homebrew.  This ought to work:

brew unlink gradle
brew tap homebrew/versions
brew install gradle28

Then you should be able to follow the README instructions, which start
with running gradle, then using the gradle wrapper for any builds you
want to run.

Mathieu



On Tue, Aug 23, 2016 at 6:21 AM, Sankar Narayanan
 wrote:
> Hi,
>
> I got your response as below.
>
> "Hi Sankar,
>
> It looks like Kafka's build scripts are not compatible with Gradle
> 3.0.  I'd suggest using the gradle wrapper (./gradlew) included in the
> Kafka repo, which will automatically install Gradle 2.13 which is
> compatible with the build scripts.
>
> Mathieu",
>
>
> When i tried running gradle wrapper (./gradlew) directly, am getting
> the following error.
>
> and is the reason i tried installing gradle using homebrew, which
> finally ended up installing gradle 3.0 .  Can you please help on to
> overcome the below error?
>
>
> $ ./gradlew
>
> Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
>
> Thanks,
>
>
>
> On Mon, Aug 22, 2016 at 2:30 PM, Sankar Narayanan > wrote:
>
>> Hi,
>>
>> I am facing the below issue while trying to build kafka using gradle.  can
>> you please help?
>>
>>  installed grade 3.0 and then git clone kafka from apache site. Then I ran
>> gradle under kafka folder. But I got the error "No such property: useAnt
>> for class: org.gradle.api.tasks.scala.ScalaCompileOptions". I looked at
>> build.gradle for useAnt and removed that line. The error still persisted.
>> Even I removed all scalsCompile related part from build.gradle I still get
>> the same error.
>>
>> I searched online and found that useAnt is deprecated in gradle 3.0, but
>> why this error still happen if I removed useAnt from build.gradle?
>>
>> Thanks,
>>


Re: Build Issue with Gradle

2016-08-22 Thread Mathieu Fenniak
Hi Sankar,

It looks like Kafka's build scripts are not compatible with Gradle
3.0.  I'd suggest using the gradle wrapper (./gradlew) included in the
Kafka repo, which will automatically install Gradle 2.13 which is
compatible with the build scripts.

Mathieu


On Mon, Aug 22, 2016 at 3:00 AM, Sankar Narayanan
 wrote:
> Hi,
>
> I am facing the below issue while trying to build kafka using gradle.  can
> you please help?
>
>  installed grade 3.0 and then git clone kafka from apache site. Then I ran
> gradle under kafka folder. But I got the error "No such property: useAnt
> for class: org.gradle.api.tasks.scala.ScalaCompileOptions". I looked at
> build.gradle for useAnt and removed that line. The error still persisted.
> Even I removed all scalsCompile related part from build.gradle I still get
> the same error.
>
> I searched online and found that useAnt is deprecated in gradle 3.0, but
> why this error still happen if I removed useAnt from build.gradle?
>
> Thanks,


Re: KTable aggregations send intermediate results downstream?

2016-08-18 Thread Mathieu Fenniak
Hi Guozhang,

Hm... I hadn't thought of the repartitioning involvement.

I'm not confident I'm understanding completely, but I believe you're
saying the decision to process data in this way is made before the
data being processed is available, because the partition *may* change,
because the groupBy key *may* change.

I'm still feeling that I'm stuck getting corrupted output in the
middle of an aggregation.

It's especially problematic for me if the updates to the source KTable
don't actually affect the results of the aggregation.  In the
word-count example in my original e-mail, this might be similar to
editing an unrelated field "author" in any article; doesn't actually
affect the groupBy, doesn't affect the aggregation, but still results
in the wrong output occurring temporarily.  (and inefficient
processing)

Are there any tools in Kafka Streams that might help me prevent
downstream calculations if the relevant inputs haven't changed?  I was
thinking I'd be able to use mapValues to pluck only relevant fields
out of a KTable, materialize a new KTable (.through) from that, and
then there'd be some state from which KS would be able to only invoke
downstream nodes if data has changed... but it doesn't seem to work
like that.

Thanks so much for your responses Guozhang, I really appreciate your
time to help me out.

Mathieu


On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang  wrote:
> The problem is that Kafka Streams need to repartition the streams based on
> the groupBy keys when doing aggregations. For your case, the original
> stream may be read from a topic that is partitioned on "K", and you need to
> first repartition on "category" on an intermediate topic before the
> aggregation can be executed.
>
> Hence the old and new value may be sent to two different partitions of the
> intermediate topic, and hence be processed by two different process (it
> won't be the case in your application, since you mentioned the "category"
> will never change). Since the library cannot tell if the groupBy key will
> never change, it has to be conservative and do this subtract / add process
> while receiving the old / new value.
>
>
> Guozhang
>
>
> On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Thanks for responding.  Ah, I see what you're saying... in the case of
>> an update to the KTable, the aggregator's subtractor result would be
>> necessary if the group-by key changes in the update.
>>
>> It makes sense, but unfortunately the behavior leaves me feeling a
>> little sketchy... when the group-by key doesn't change (which is
>> guaranteed in my case), I'm outputting results that don't correspond
>> at all to the inputs, temporarily.  It's immediately followed by a
>> corrected result.
>>
>> Would it be a feasible optimization to not send the subtractor's
>> result out of the aggregate, only in the case where the groupBy key
>> does not change between the old record and the new record?
>>
>> Mathieu
>>
>> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang  wrote:
>> > Hello Mathieu,
>> >
>> > Note that semantics of KTable aggregations (i.e.
>> "KTable.groupBy.aggregate"
>> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as
>> in
>> > 0.10.0) are different, in the sense that when the table is updated (i.e.
>> a
>> > new record with the same key "K1" is received), the old record's effect
>> on
>> > the aggregation need to first be subtracted before the new record's
>> effect
>> > on the aggregation can be added; whereas in the latter case there is no
>> > "old values" that are not overridden, hence only "adder" aggregator is
>> > needed.
>> >
>> > So suppose your updated record on K1 is on a different "category", say:
>> >
>> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>> >
>> >
>> > Then the aggregated result should be:
>> >
>> > {key: "kafka", value: 2}
>> > {key: "kafka2", value: 4}
>> >
>> >
>> > Does this make sense now?
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
>> > mathieu.fenn...@replicon.com> wrote:
>> >
>> >> Hello again, kafka-users,
>> >>
>> >> When I aggregate 

Re: KTable aggregations send intermediate results downstream?

2016-08-17 Thread Mathieu Fenniak
Hi Guozhang,

Thanks for responding.  Ah, I see what you're saying... in the case of
an update to the KTable, the aggregator's subtractor result would be
necessary if the group-by key changes in the update.

It makes sense, but unfortunately the behavior leaves me feeling a
little sketchy... when the group-by key doesn't change (which is
guaranteed in my case), I'm outputting results that don't correspond
at all to the inputs, temporarily.  It's immediately followed by a
corrected result.

Would it be a feasible optimization to not send the subtractor's
result out of the aggregate, only in the case where the groupBy key
does not change between the old record and the new record?

Mathieu

On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang  wrote:
> Hello Mathieu,
>
> Note that semantics of KTable aggregations (i.e. "KTable.groupBy.aggregate"
> as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as in
> 0.10.0) are different, in the sense that when the table is updated (i.e. a
> new record with the same key "K1" is received), the old record's effect on
> the aggregation need to first be subtracted before the new record's effect
> on the aggregation can be added; whereas in the latter case there is no
> "old values" that are not overridden, hence only "adder" aggregator is
> needed.
>
> So suppose your updated record on K1 is on a different "category", say:
>
> K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>
>
> Then the aggregated result should be:
>
> {key: "kafka", value: 2}
> {key: "kafka2", value: 4}
>
>
> Does this make sense now?
>
> Guozhang
>
>
> On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hello again, kafka-users,
>>
>> When I aggregate a KTable, a future input that updates a KTable's
>> value for a specific key causes the aggregate's subtractor to be
>> invoked, and then its adder.  This part is great, completely
>> as-expected.
>>
>> But what I didn't expect is that the intermediate result of the
>> subtractor would be sent downstream.  This value doesn't reflect the
>> reality of the inputs to the aggregator, so sending it downstream is
>> effectively sending "corrupt" data to the next processing node.  Is
>> this the expected behavior, or is this a bug?
>>
>> Take for example, a table of blog articles and an aggregator that
>> counts the number of words in each category of the blog:
>>
>> topic: articles
>>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>>   K2, {"category": "kafka", "text": "word1, word2"}
>>
>> articles.groupBy((k,v) -> v.category)
>>   .aggregate(() -> 0,
>> (k,v,t) -> t + v.text.split(" ").length,
>> (k,v,t) -> t - v.text.split(" ").length
>>   )
>>
>> This aggregator will produce {key: "kafka", value: 3}, then {key:
>> "kafka", value: 5}.  If I update one of the blog articles and send a
>> new message to the articles topic:
>>
>>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>>
>> The aggregator will first produce {key: "kafka", value: 2} when the
>> subtractor is called, then will produce {key: "kafka", value: 6} when
>> the adder is called.  The subtractor's calculation does not actually
>> match the reality; K1 was never deleted, it was just updated.
>>
>> Mathieu
>>
>
>
>
> --
> -- Guozhang


KTable aggregations send intermediate results downstream?

2016-08-17 Thread Mathieu Fenniak
Hello again, kafka-users,

When I aggregate a KTable, a future input that updates a KTable's
value for a specific key causes the aggregate's subtractor to be
invoked, and then its adder.  This part is great, completely
as-expected.

But what I didn't expect is that the intermediate result of the
subtractor would be sent downstream.  This value doesn't reflect the
reality of the inputs to the aggregator, so sending it downstream is
effectively sending "corrupt" data to the next processing node.  Is
this the expected behavior, or is this a bug?

Take for example, a table of blog articles and an aggregator that
counts the number of words in each category of the blog:

topic: articles
  K1, {"category": "kafka", "text": "word1, word2, word3"}
  K2, {"category": "kafka", "text": "word1, word2"}

articles.groupBy((k,v) -> v.category)
  .aggregate(() -> 0,
(k,v,t) -> t + v.text.split(" ").length,
(k,v,t) -> t - v.text.split(" ").length
  )

This aggregator will produce {key: "kafka", value: 3}, then {key:
"kafka", value: 5}.  If I update one of the blog articles and send a
new message to the articles topic:

  K1, {"category": "kafka", "text": "word1, word2, word3, word4"}

The aggregator will first produce {key: "kafka", value: 2} when the
subtractor is called, then will produce {key: "kafka", value: 6} when
the adder is called.  The subtractor's calculation does not actually
match the reality; K1 was never deleted, it was just updated.

Mathieu


Re: DLL Hell

2016-08-16 Thread Mathieu Fenniak
I really don't know anything about how cmake works, so, I can't explain
that error for you.  Maybe it needs VS installed to be able to generate
those files, but, I don't know.

You will definitely need to have native Windows build tools installed.  You
can probably avoid having Visual Studio (the IDE); Microsoft does
distribute a full SDK that is standalone from VS and I believe includes
compiler tools.

I have no idea if you could use maven-nar-plugin, but, I do suspect you'd
have to redo an incredible amount of work to reproduce the rocksdb Windows
build.

It might be better to take these questions over to a rocksb community forum
(eg. https://www.facebook.com/groups/rocksdb.dev/), since they're probably
not of interest to most of kafka-users.

Mathieu


On Tue, Aug 16, 2016 at 7:40 AM, Martin Gainty  wrote:

>
>
>
> > From: mathieu.fenn...@replicon.com
> > Date: Tue, 16 Aug 2016 06:57:16 -0600
> > Subject: Re: DLL Hell
> > To: users@kafka.apache.org
> >
> > Hey Martin,
> >
> > I had to modify the -G argument to that command to include the visual
> > studio year.  If you run "cmake /?", it will output all the available
> > generators.  My cmake looked like:
> >
> > cmake -G "Visual Studio 12 2013 Win64" -DJNI=1 ..
> >
> > I think this is probably a change in cmake since the rocksdb doc was
> > written (
> > https://cmake.org/cmake/help/v3.0/generator/Visual%
> 20Studio%2012%202013.html
> > ).
> > MG>same "informative error"
> >C:\cygwin64\bin\cmake -G "Visual Studio 12 2013 Win64" -DJNI=1
> CMake Error: Could not create named generator Visual Studio 12 2013 Win64
> Generators  Unix Makefiles   = Generates standard UNIX
> makefiles.  Ninja= Generates build.ninja files.
> CodeBlocks - Ninja   = Generates CodeBlocks project files.
> CodeBlocks - Unix Makefiles  = Generates CodeBlocks project files.
> CodeLite - Ninja = Generates CodeLite project files.  CodeLite
> - Unix Makefiles= Generates CodeLite project files.  Eclipse CDT4 -
> Ninja = Generates Eclipse CDT 4.0 project files.  Eclipse CDT4 -
> Unix Makefiles= Generates Eclipse CDT 4.0 project files.  KDevelop3
> = Generates KDevelop 3 project files.  KDevelop3 - Unix
> Makefiles   = Generates KDevelop 3 project files.  Kate - Ninja
>  = Generates Kate project files.  Kate - Unix Makefiles=
> Generates Kate project files.  Sublime Text 2 - Ninja   = Generates
> Sublime Text 2 project files.  Sublime Text 2 - Unix Makefiles
>  = Generates Sublime Text 2 project files.
> MG>I am thinking if I want to automate this native build..I could more
> easily create binary thru maven-nar-plugin ?
> MG>as I do not have any MS VS or DotNet installed..maybe I need to install
> many gigs of MS specific VS?
> MG>Please advise
> > Mathieu
> >
> >
> > On Tue, Aug 16, 2016 at 5:03 AM, Martin Gainty 
> wrote:
> >
> > > havent used cmake in over 10 years so Im a bit lost..
> > > cmake -G "Visual Studio 12 Win64" -DGFLAGS=1 -DSNAPPY=1 -DJEMALLOC=1
> > > -DJNI=1
> > > CMake Error: Could not create named generator Visual Studio 12 Win64
> > > ?Please advise
> > > Martin
> > > __
> > >
> > >
> > >
> > > > From: mathieu.fenn...@replicon.com
> > > > Date: Mon, 15 Aug 2016 13:43:47 -0600
> > > > Subject: Re: DLL Hell
> > > > To: users@kafka.apache.org
> > > >
> > > > Hi Martin,
> > > >
> > > > rocksdb does not currently distribute a Windows-compatible build of
> their
> > > > rocksdbjni library.  I recently wrote up some instructions on how to
> > > > produce a local build, which you can find here:
> > > > http://mail-archives.apache.org/mod_mbox/kafka-users/
> > > 201608.mbox/%3CCAHoiPjweo-xSj3TiodcDVf4wNnnJ8u6PcwWDPF7L
> > > T5ps%2BxQ3eA%40mail.gmail.com%3E
> > > >
> > > > I'd also suggest tracking this issue in GitHub, which is likely to be
> > > > updated if this ever changes: https://github.com/facebook/
> > > rocksdb/issues/703
> > > >
> > > > Mathieu
> > > >
> > > >
> > > > On Mon, Aug 15, 2016 at 1:34 PM, Martin Gainty 
> > > wrote:
> > > >
> > > > > kafka-trunk\streams>gradle buildCaused by:
> java.lang.RuntimeException:
> > > > > librocksdbjni-win64.dll was not found inside JAR.at
> > > org.rocksdb.
> > > > > NativeLibraryLoader.loadLibraryFromJarToTemp(
> > > NativeLibraryLoader.java:106)
> > > > >  at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(
> > > NativeLibraryLoader.java:78)
> > > > > at org.rocksdb.NativeLibraryLoader.loadLibrary(
> > > NativeLibraryLoader.java:56)
> > > > >at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47) at
> > > > > org.rocksdb.RocksDB.(RocksDB.java:23)
> > > > > any idea where I can locale librocksdbjni-win64.dll ?
> > > > > /thanks/
> > > > > Martin
> > > > > __
> > > > >
> > > > >
> > >
> > >
>
>


Re: Automated Testing w/ Kafka Streams

2016-08-16 Thread Mathieu Fenniak
Hi Guozhang,

Thanks for the feedback.  What would you think about
including ProcessorTopologyTestDriver in a released artifact from kafka
streams in a future release?  Or alternatively, what other approach would
you recommend to incorporating it into another project's tests?  I can copy
it wholesale into my project and it works fine, but I'll have to keep it
up-to-date by hand, which isn't ideal. :-)

Mathieu


On Mon, Aug 15, 2016 at 3:24 PM, Guozhang Wang  wrote:

> Mathieu,
>
> Your composition of Per-module Unit Tests + ProcessorTopologyTestDriver +
> System Tests looks good to me, and I agree with you that since this is part
> of your pre-commit process, which could be triggered concurrently from
> different developers / teams, EmbeddedSingleNodeKafkaCluster +
> EmbeddedZookeeper may not work best for you.
>
>
> Guozhang
>
>
> On Mon, Aug 15, 2016 at 1:39 PM, Radoslaw Gruchalski  >
> wrote:
>
> > Out of curiosity, are you aware of kafka.util.TestUtils and Apache
> Curator
> > TestingServer?
> > I’m using this successfully to test publis / consume scenarios with
> things
> > like Flink, Spark and custom apps.
> > What would stop you from taking the same approach?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On August 15, 2016 at 9:41:37 PM, Mathieu Fenniak (
> > mathieu.fenn...@replicon.com) wrote:
> >
> > Hi Michael,
> >
> > It would definitely be an option. I am not currently doing any testing
> > like that; it could replace the ProcessorTopologyTestDriver-style
> testing
> > that I'd like to do, but there are some trade-offs to consider:
> >
> > - I can't do an isolated test of just the TopologyBuilder; I'd be
> > bringing in configuration management code (eg. configuring where to
> access
> > ZK + Kafka).
> > - Tests using a running Kafka server wouldn't have a clear end-point; if
> > something in the toplogy doesn't publish a message where I expected it
> to,
> > my test can only fail via a timeout.
> > - Tests are likely to be slower; this might not be significant, but a
> > small difference in test speed has a big impact in productivity after a
> few
> > months of development
> > - Tests will be more complex & fragile; some additional component needs
> > to manage starting up that Kafka server, making sure it's ready-to-go,
> > running tests, and then tearing it down
> > - Tests will have to be cautious of state existing in Kafka. eg. two
> > test suites that touch the same topics could be influenced by state of a
> > previous test. Either you take a "destroy the world" approach between
> test
> > cases (or test suites), which probably makes test speed much worse, or,
> you
> > find another way to isolate test's state.
> >
> > I'd have to face all these problems at the higher level that I'm calling
> > "systems-level tests", but, I think it would be better to do the majority
> > of the automated testing at a lower level that doesn't bring these
> > considerations into play.
> >
> > Mathieu
> >
> >
> > On Mon, Aug 15, 2016 at 12:13 PM, Michael Noll 
> > wrote:
> >
> > > Mathieu,
> > >
> > > follow-up question: Are you also doing or considering integration
> testing
> > > by spawning a local Kafka cluster and then reading/writing to that
> > cluster
> > > (often called embedded or in-memory cluster)? This approach would be in
> > > the middle between ProcessorTopologyTestDriver (that does not spawn a
> > Kafka
> > > cluster) and your system-level testing (which I suppose is running
> > against
> > > a "real" test Kafka cluster).
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Aug 15, 2016 at 3:44 PM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > > > Hey all,
> > > >
> > > > At my workplace, we have a real focus on software automated testing.
> > I'd
> > > > love to be able to test the composition of a TopologyBuilder with
> > > > org.apache.kafka.test.ProcessorTopologyTestDriver
> > > > <https://github.com/apache/kafka/blob/14934157df7aaf5e9c37a302ef9fd9
> > > > 317b95efa4/streams/src/test/java/org/apache/kafka/test/
> > > > ProcessorTopologyTestDriver.java>;
> > > > has there ever been any thought given to making this part of the
> pub

Re: Automated Testing w/ Kafka Streams

2016-08-16 Thread Mathieu Fenniak
Hi Radek,

No, I'm not familiar with these tools.  I see that Curator's TestingServer
looks pretty straight-forward, but, I'm not really sure what
kafka.util.TestUtils
is.  I can't find any documentation referring to this, and it doesn't seem
to be a part of any published maven artifacts in the Kafka project; can you
point me at what you're using a little more specifically?

Mathieu


On Mon, Aug 15, 2016 at 2:39 PM, Radoslaw Gruchalski 
wrote:

> Out of curiosity, are you aware of kafka.util.TestUtils and Apache Curator
> TestingServer?
> I’m using this successfully to test publis / consume scenarios with things
> like Flink, Spark and custom apps.
> What would stop you from taking the same approach?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On August 15, 2016 at 9:41:37 PM, Mathieu Fenniak (
> mathieu.fenn...@replicon.com) wrote:
>
> Hi Michael,
>
> It would definitely be an option. I am not currently doing any testing
> like that; it could replace the ProcessorTopologyTestDriver-style testing
> that I'd like to do, but there are some trade-offs to consider:
>
> - I can't do an isolated test of just the TopologyBuilder; I'd be
> bringing in configuration management code (eg. configuring where to access
> ZK + Kafka).
> - Tests using a running Kafka server wouldn't have a clear end-point; if
> something in the toplogy doesn't publish a message where I expected it to,
> my test can only fail via a timeout.
> - Tests are likely to be slower; this might not be significant, but a
> small difference in test speed has a big impact in productivity after a
> few
> months of development
> - Tests will be more complex & fragile; some additional component needs
> to manage starting up that Kafka server, making sure it's ready-to-go,
> running tests, and then tearing it down
> - Tests will have to be cautious of state existing in Kafka. eg. two
> test suites that touch the same topics could be influenced by state of a
> previous test. Either you take a "destroy the world" approach between test
> cases (or test suites), which probably makes test speed much worse, or,
> you
> find another way to isolate test's state.
>
> I'd have to face all these problems at the higher level that I'm calling
> "systems-level tests", but, I think it would be better to do the majority
> of the automated testing at a lower level that doesn't bring these
> considerations into play.
>
> Mathieu
>
>
> On Mon, Aug 15, 2016 at 12:13 PM, Michael Noll 
> wrote:
>
> > Mathieu,
> >
> > follow-up question: Are you also doing or considering integration
> testing
> > by spawning a local Kafka cluster and then reading/writing to that
> cluster
> > (often called embedded or in-memory cluster)? This approach would be in
> > the middle between ProcessorTopologyTestDriver (that does not spawn a
> Kafka
> > cluster) and your system-level testing (which I suppose is running
> against
> > a "real" test Kafka cluster).
> >
> > -Michael
> >
> >
> >
> >
> >
> > On Mon, Aug 15, 2016 at 3:44 PM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> > > Hey all,
> > >
> > > At my workplace, we have a real focus on software automated testing.
> I'd
> > > love to be able to test the composition of a TopologyBuilder with
> > > org.apache.kafka.test.ProcessorTopologyTestDriver
> > > <https://github.com/apache/kafka/blob/14934157df7aaf5e9c37a302ef9fd9
> > > 317b95efa4/streams/src/test/java/org/apache/kafka/test/
> > > ProcessorTopologyTestDriver.java>;
> > > has there ever been any thought given to making this part of the
> public
> > API
> > > of Kafka Streams?
> > >
> > > For some background, here are some details on the automated testing
> plan
> > > that I have in mind for a Kafka Streams application. Our goal is to
> > enable
> > > continuous deployment of any new development we do, so, it has to be
> > > rigorously tested with complete automation.
> > >
> > > As part of our pre-commit testing, we'd first have these gateways; no
> > code
> > > would reach our master branch without passing these tests:
> > >
> > > - At the finest level, unit tests covering individual pieces like a
> > > Serde, ValueMapper, ValueJoiner, aggregate adder/subtractor, etc.
> > These
> > > pieces are very isolated, very easy to unit test.
> > > - At a higher level, I'd like to have component tests of t

Re: DLL Hell

2016-08-16 Thread Mathieu Fenniak
Hey Martin,

I had to modify the -G argument to that command to include the visual
studio year.  If you run "cmake /?", it will output all the available
generators.  My cmake looked like:

cmake -G "Visual Studio 12 2013 Win64" -DJNI=1 ..

I think this is probably a change in cmake since the rocksdb doc was
written (
https://cmake.org/cmake/help/v3.0/generator/Visual%20Studio%2012%202013.html
).

Mathieu


On Tue, Aug 16, 2016 at 5:03 AM, Martin Gainty  wrote:

> havent used cmake in over 10 years so Im a bit lost..
> cmake -G "Visual Studio 12 Win64" -DGFLAGS=1 -DSNAPPY=1 -DJEMALLOC=1
> -DJNI=1
> CMake Error: Could not create named generator Visual Studio 12 Win64
> ?Please advise
> Martin
> __
>
>
>
> > From: mathieu.fenn...@replicon.com
> > Date: Mon, 15 Aug 2016 13:43:47 -0600
> > Subject: Re: DLL Hell
> > To: users@kafka.apache.org
> >
> > Hi Martin,
> >
> > rocksdb does not currently distribute a Windows-compatible build of their
> > rocksdbjni library.  I recently wrote up some instructions on how to
> > produce a local build, which you can find here:
> > http://mail-archives.apache.org/mod_mbox/kafka-users/
> 201608.mbox/%3CCAHoiPjweo-xSj3TiodcDVf4wNnnJ8u6PcwWDPF7L
> T5ps%2BxQ3eA%40mail.gmail.com%3E
> >
> > I'd also suggest tracking this issue in GitHub, which is likely to be
> > updated if this ever changes: https://github.com/facebook/
> rocksdb/issues/703
> >
> > Mathieu
> >
> >
> > On Mon, Aug 15, 2016 at 1:34 PM, Martin Gainty 
> wrote:
> >
> > > kafka-trunk\streams>gradle buildCaused by: java.lang.RuntimeException:
> > > librocksdbjni-win64.dll was not found inside JAR.at
> org.rocksdb.
> > > NativeLibraryLoader.loadLibraryFromJarToTemp(
> NativeLibraryLoader.java:106)
> > >  at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(
> NativeLibraryLoader.java:78)
> > > at org.rocksdb.NativeLibraryLoader.loadLibrary(
> NativeLibraryLoader.java:56)
> > >at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47) at
> > > org.rocksdb.RocksDB.(RocksDB.java:23)
> > > any idea where I can locale librocksdbjni-win64.dll ?
> > > /thanks/
> > > Martin
> > > __
> > >
> > >
>
>


Re: DLL Hell

2016-08-15 Thread Mathieu Fenniak
Hi Martin,

rocksdb does not currently distribute a Windows-compatible build of their
rocksdbjni library.  I recently wrote up some instructions on how to
produce a local build, which you can find here:
http://mail-archives.apache.org/mod_mbox/kafka-users/201608.mbox/%3CCAHoiPjweo-xSj3TiodcDVf4wNnnJ8u6PcwWDPF7LT5ps%2BxQ3eA%40mail.gmail.com%3E

I'd also suggest tracking this issue in GitHub, which is likely to be
updated if this ever changes: https://github.com/facebook/rocksdb/issues/703

Mathieu


On Mon, Aug 15, 2016 at 1:34 PM, Martin Gainty  wrote:

> kafka-trunk\streams>gradle buildCaused by: java.lang.RuntimeException:
> librocksdbjni-win64.dll was not found inside JAR.at org.rocksdb.
> NativeLibraryLoader.loadLibraryFromJarToTemp(NativeLibraryLoader.java:106)
>  at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47) at
> org.rocksdb.RocksDB.(RocksDB.java:23)
> any idea where I can locale librocksdbjni-win64.dll ?
> /thanks/
> Martin
> __
>
>


Re: Automated Testing w/ Kafka Streams

2016-08-15 Thread Mathieu Fenniak
Hi Michael,

It would definitely be an option.  I am not currently doing any testing
like that; it could replace the ProcessorTopologyTestDriver-style testing
that I'd like to do, but there are some trade-offs to consider:

   - I can't do an isolated test of just the TopologyBuilder; I'd be
   bringing in configuration management code (eg. configuring where to access
   ZK + Kafka).
   - Tests using a running Kafka server wouldn't have a clear end-point; if
   something in the toplogy doesn't publish a message where I expected it to,
   my test can only fail via a timeout.
   - Tests are likely to be slower; this might not be significant, but a
   small difference in test speed has a big impact in productivity after a few
   months of development
   - Tests will be more complex & fragile; some additional component needs
   to manage starting up that Kafka server, making sure it's ready-to-go,
   running tests, and then tearing it down
   - Tests will have to be cautious of state existing in Kafka.  eg. two
   test suites that touch the same topics could be influenced by state of a
   previous test.  Either you take a "destroy the world" approach between test
   cases (or test suites), which probably makes test speed much worse, or, you
   find another way to isolate test's state.

I'd have to face all these problems at the higher level that I'm calling
"systems-level tests", but, I think it would be better to do the majority
of the automated testing at a lower level that doesn't bring these
considerations into play.

Mathieu


On Mon, Aug 15, 2016 at 12:13 PM, Michael Noll  wrote:

> Mathieu,
>
> follow-up question:  Are you also doing or considering integration testing
> by spawning a local Kafka cluster and then reading/writing to that cluster
> (often called embedded or in-memory cluster)?  This approach would be in
> the middle between ProcessorTopologyTestDriver (that does not spawn a Kafka
> cluster) and your system-level testing (which I suppose is running against
> a "real" test Kafka cluster).
>
> -Michael
>
>
>
>
>
> On Mon, Aug 15, 2016 at 3:44 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hey all,
> >
> > At my workplace, we have a real focus on software automated testing.  I'd
> > love to be able to test the composition of a TopologyBuilder with
> > org.apache.kafka.test.ProcessorTopologyTestDriver
> > <https://github.com/apache/kafka/blob/14934157df7aaf5e9c37a302ef9fd9
> > 317b95efa4/streams/src/test/java/org/apache/kafka/test/
> > ProcessorTopologyTestDriver.java>;
> > has there ever been any thought given to making this part of the public
> API
> > of Kafka Streams?
> >
> > For some background, here are some details on the automated testing plan
> > that I have in mind for a Kafka Streams application.  Our goal is to
> enable
> > continuous deployment of any new development we do, so, it has to be
> > rigorously tested with complete automation.
> >
> > As part of our pre-commit testing, we'd first have these gateways; no
> code
> > would reach our master branch without passing these tests:
> >
> >- At the finest level, unit tests covering individual pieces like a
> >Serde, ValueMapper, ValueJoiner, aggregate adder/subtractor, etc.
> These
> >pieces are very isolated, very easy to unit test.
> >- At a higher level, I'd like to have component tests of the
> composition
> >of the TopologyBuilder; this is where ProcessorTopologyTestDriver
> would
> > be
> >valuable.  There'd be far fewer of these tests than the lower-level
> > tests.
> >There are no external dependencies to these tests, so they'd be very
> > fast.
> >
> > Having passed that level of testing, we'd deploy the Kafka Streams
> > application to an integration testing area where the rest of our
> > application is kept up-to-date, and proceed with these integration tests:
> >
> >- Systems-level tests where we synthesize inputs to the Kafka topics,
> >wait for the Streams app to process the data, and then inspect the
> > output
> >that it pushes into other Kafka topics.  These tests will be fewer in
> >nature than the above tests, but they serve to ensure that the
> > application
> >is well-configured, executing, and handling inputs & outputs as
> > expected.
> >- UI-level tests where we verify behaviors that are expected from the
> >system as a whole.  As our application is a web app, we'd be using
> > Selenium
> >to drive a web browser and verifying interactions and outputs that are
> >expected from the Streams application matching our real-world
> use-cases.
> >These tests are even fewer in nature than the above.
> >
> > This is an adaptation of the automated testing scaffold that we currently
> > use for microservices; I'd love any input on the plan as a whole.
> >
> > Thanks,
> >
> > Mathieu
> >
>


Automated Testing w/ Kafka Streams

2016-08-15 Thread Mathieu Fenniak
Hey all,

At my workplace, we have a real focus on software automated testing.  I'd
love to be able to test the composition of a TopologyBuilder with
org.apache.kafka.test.ProcessorTopologyTestDriver
;
has there ever been any thought given to making this part of the public API
of Kafka Streams?

For some background, here are some details on the automated testing plan
that I have in mind for a Kafka Streams application.  Our goal is to enable
continuous deployment of any new development we do, so, it has to be
rigorously tested with complete automation.

As part of our pre-commit testing, we'd first have these gateways; no code
would reach our master branch without passing these tests:

   - At the finest level, unit tests covering individual pieces like a
   Serde, ValueMapper, ValueJoiner, aggregate adder/subtractor, etc.  These
   pieces are very isolated, very easy to unit test.
   - At a higher level, I'd like to have component tests of the composition
   of the TopologyBuilder; this is where ProcessorTopologyTestDriver would be
   valuable.  There'd be far fewer of these tests than the lower-level tests.
   There are no external dependencies to these tests, so they'd be very fast.

Having passed that level of testing, we'd deploy the Kafka Streams
application to an integration testing area where the rest of our
application is kept up-to-date, and proceed with these integration tests:

   - Systems-level tests where we synthesize inputs to the Kafka topics,
   wait for the Streams app to process the data, and then inspect the output
   that it pushes into other Kafka topics.  These tests will be fewer in
   nature than the above tests, but they serve to ensure that the application
   is well-configured, executing, and handling inputs & outputs as expected.
   - UI-level tests where we verify behaviors that are expected from the
   system as a whole.  As our application is a web app, we'd be using Selenium
   to drive a web browser and verifying interactions and outputs that are
   expected from the Streams application matching our real-world use-cases.
   These tests are even fewer in nature than the above.

This is an adaptation of the automated testing scaffold that we currently
use for microservices; I'd love any input on the plan as a whole.

Thanks,

Mathieu


Re: Streams 'External source topic not found' while using groupBy/aggregate

2016-08-15 Thread Mathieu Fenniak
Hi Guozhang,

I tested your backport branch, and it looks like it works fine.  The same
code that was producing the "Invalid topology building" error is working
correctly against your branch, just like it is with trunk.

Mathieu


On Sun, Aug 14, 2016 at 11:25 PM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> I have a PR against 0.10.0 branch to backport the bug fix plus some
> refactoring, feel free to try it out:
>
>
> https://github.com/apache/kafka/pull/1735
>
>
> Guozhang
>
> On Wed, Aug 10, 2016 at 2:28 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hi Guozhang,
> >
> > Yes, it does seem to be fixed in trunk.  Thanks.  I should have tried
> that,
> > but I assumed that the recently released 0.10.0.1 would be pretty close
> to
> > trunk.  I can see where that was mistaken, since 0.10.0 is quite
> divergent
> > from trunk.
> >
> > Mathieu
> >
> >
> > On Wed, Aug 10, 2016 at 2:39 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Mathieu,
> > >
> > > I think this issue is fixed in trunk but may get missed in the 0.10.0
> > > branch, could you try running your program from trunk to verify if it
> is
> > > the case? If yes we can consider backportting the hotfix from trunk to
> > > 0.10.0 and have another bug fix release.
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Aug 10, 2016 at 7:32 AM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > > > Hey there, Kafka Users,
> > > >
> > > > I'm trying to join two topics with Kafka Streams.  The first topic
> is a
> > > > changelog of one object, and the second is a changelog of a related
> > > > object.  In order to join these tables, I'm grouping the second table
> > by
> > > a
> > > > piece of data in it that indicates what record it is related to in
> the
> > > > first table.  But I'm getting an unexpected error related to the
> > > > repartitioning topic for the aggregated table:
> > > >
> > > > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> > > topology
> > > > building: External source topic not found:
> > > > *TableNumber2Aggregated-repartition*
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> StreamPartitionAssignor.
> > > > ensureCopartitioning(StreamPartitionAssignor.java:452)
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> StreamPartitionAssignor.
> > > > ensureCopartitioning(StreamPartitionAssignor.java:440)
> > > >
> > > >
> > > > (Full exception:
> > > > https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686)
> > > >
> > > > It appears that the "TableNumber2Aggregated-repartition" topic *is*
> > > > created
> > > > in Kafka by the streams application, but the Kafka topic has a prefix
> > > that
> > > > matches my application id (timesheet-status).  Perhaps something is
> > > > prefixing the topic name, but it isn't being applied everywhere?
> > > >
> > > > $ ./kafka-topics.sh --zookeeper localhost --list
> > > > TableNumber1
> > > > TableNumber2
> > > > __consumer_offsets
> > > > timesheet-status-TableNumber2Aggregated-repartition
> > > >
> > > >
> > > > Here's a sample that reproduces the issue (note, I've cut out all the
> > > > actual mapping, grouping, and aggregating logic, but, this still
> > > reproduces
> > > > the error):
> > > >
> > > > public static TopologyBuilder createTopology() {
> > > > KStreamBuilder builder = new KStreamBuilder();
> > > >
> > > > KTable table1Mapped = builder.table(Serdes.String(), new
> > > > JsonSerde(Map.class), "TableNumber1")
> > > > .mapValues((value) -> null);
> > > >
> > > > KTable table2Aggregated = builder.table(Serdes.String(), new
> > > > JsonSerde(Map.class), "TableNumber2")
> > > > .groupBy((key, value) -> null)
> > > > .aggregate(() -> null, (k, v, t) -> null, (k, v, t) ->
> > null,
> > > > new JsonSerde(Map.class), "TableNumber2Aggregated");
> > > >
> > > > table1Mapped.join(table2Aggregated, (left, right) -> {
> > > > LOG.debug("join");
> > > > return null;
> > > > });
> > > >
> > > > return builder;
> > > > }
> > > >
> > > > I'm using the latest Kafka Streams release, 0.10.0.1.  Any thoughts
> on
> > > how
> > > > I could proceed to debug or workaround this?
> > > >
> > > > Thanks all,
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Streams 'External source topic not found' while using groupBy/aggregate

2016-08-10 Thread Mathieu Fenniak
Hi Guozhang,

Yes, it does seem to be fixed in trunk.  Thanks.  I should have tried that,
but I assumed that the recently released 0.10.0.1 would be pretty close to
trunk.  I can see where that was mistaken, since 0.10.0 is quite divergent
from trunk.

Mathieu


On Wed, Aug 10, 2016 at 2:39 PM, Guozhang Wang  wrote:

> Hello Mathieu,
>
> I think this issue is fixed in trunk but may get missed in the 0.10.0
> branch, could you try running your program from trunk to verify if it is
> the case? If yes we can consider backportting the hotfix from trunk to
> 0.10.0 and have another bug fix release.
>
>
> Guozhang
>
> On Wed, Aug 10, 2016 at 7:32 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hey there, Kafka Users,
> >
> > I'm trying to join two topics with Kafka Streams.  The first topic is a
> > changelog of one object, and the second is a changelog of a related
> > object.  In order to join these tables, I'm grouping the second table by
> a
> > piece of data in it that indicates what record it is related to in the
> > first table.  But I'm getting an unexpected error related to the
> > repartitioning topic for the aggregated table:
> >
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: External source topic not found:
> > *TableNumber2Aggregated-repartition*
> > at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> > ensureCopartitioning(StreamPartitionAssignor.java:452)
> > at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> > ensureCopartitioning(StreamPartitionAssignor.java:440)
> >
> >
> > (Full exception:
> > https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686)
> >
> > It appears that the "TableNumber2Aggregated-repartition" topic *is*
> > created
> > in Kafka by the streams application, but the Kafka topic has a prefix
> that
> > matches my application id (timesheet-status).  Perhaps something is
> > prefixing the topic name, but it isn't being applied everywhere?
> >
> > $ ./kafka-topics.sh --zookeeper localhost --list
> > TableNumber1
> > TableNumber2
> > __consumer_offsets
> > timesheet-status-TableNumber2Aggregated-repartition
> >
> >
> > Here's a sample that reproduces the issue (note, I've cut out all the
> > actual mapping, grouping, and aggregating logic, but, this still
> reproduces
> > the error):
> >
> > public static TopologyBuilder createTopology() {
> > KStreamBuilder builder = new KStreamBuilder();
> >
> > KTable table1Mapped = builder.table(Serdes.String(), new
> > JsonSerde(Map.class), "TableNumber1")
> > .mapValues((value) -> null);
> >
> > KTable table2Aggregated = builder.table(Serdes.String(), new
> > JsonSerde(Map.class), "TableNumber2")
> > .groupBy((key, value) -> null)
> > .aggregate(() -> null, (k, v, t) -> null, (k, v, t) -> null,
> > new JsonSerde(Map.class), "TableNumber2Aggregated");
> >
> > table1Mapped.join(table2Aggregated, (left, right) -> {
> > LOG.debug("join");
> > return null;
> > });
> >
> > return builder;
> > }
> >
> > I'm using the latest Kafka Streams release, 0.10.0.1.  Any thoughts on
> how
> > I could proceed to debug or workaround this?
> >
> > Thanks all,
> >
> > Mathieu
> >
>
>
>
> --
> -- Guozhang
>


Streams 'External source topic not found' while using groupBy/aggregate

2016-08-10 Thread Mathieu Fenniak
Hey there, Kafka Users,

I'm trying to join two topics with Kafka Streams.  The first topic is a
changelog of one object, and the second is a changelog of a related
object.  In order to join these tables, I'm grouping the second table by a
piece of data in it that indicates what record it is related to in the
first table.  But I'm getting an unexpected error related to the
repartitioning topic for the aggregated table:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
*TableNumber2Aggregated-repartition*
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:452)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:440)


(Full exception:
https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686)

It appears that the "TableNumber2Aggregated-repartition" topic *is* created
in Kafka by the streams application, but the Kafka topic has a prefix that
matches my application id (timesheet-status).  Perhaps something is
prefixing the topic name, but it isn't being applied everywhere?

$ ./kafka-topics.sh --zookeeper localhost --list
TableNumber1
TableNumber2
__consumer_offsets
timesheet-status-TableNumber2Aggregated-repartition


Here's a sample that reproduces the issue (note, I've cut out all the
actual mapping, grouping, and aggregating logic, but, this still reproduces
the error):

public static TopologyBuilder createTopology() {
KStreamBuilder builder = new KStreamBuilder();

KTable table1Mapped = builder.table(Serdes.String(), new
JsonSerde(Map.class), "TableNumber1")
.mapValues((value) -> null);

KTable table2Aggregated = builder.table(Serdes.String(), new
JsonSerde(Map.class), "TableNumber2")
.groupBy((key, value) -> null)
.aggregate(() -> null, (k, v, t) -> null, (k, v, t) -> null,
new JsonSerde(Map.class), "TableNumber2Aggregated");

table1Mapped.join(table2Aggregated, (left, right) -> {
LOG.debug("join");
return null;
});

return builder;
}

I'm using the latest Kafka Streams release, 0.10.0.1.  Any thoughts on how
I could proceed to debug or workaround this?

Thanks all,

Mathieu


Re: Kafka Streams on Windows?

2016-08-05 Thread Mathieu Fenniak
I took that approach.  It was painful, but, ultimately did get me a working
Windows development environment.

To any who follow in my footsteps, here is my trail:

   1. Upgrade to at least Kafka Streams 0.10.0.1 (currently only in RC).
  - This is necessary because .1 bumps the rocksdb dependency to 4.8.0,
  where the previous 4.4 dependency did not yet support loading a
Windows JNI
  library.
  2. Follow the instructions here (
   https://github.com/facebook/rocksdb/blob/v4.8/CMakeLists.txt) to build
   rocksdb.
  - That link is for the v4.8 tag; be sure to match this with the
  version of rocksdb that Kafka Streams depends upon in the
future.  0.10.0.1
  -> v4.8, but future releases of Kafka Streams will likely depend on newer
  versions.
  - Be sure to include the "-DJNI=1" compile option to build the JNI
  wrapper.
  - None of the third-party dependencies (eg. snappy, jemalloc, etc.)
  seem to be required to get something functional, but, it
probably isn't the
  most efficient build.
  - Ensure that "javac" and "javah" are on your path when running
  cmake; if there are any errors in the cmake output your JNI wrapper
  probably won't build.
  - You may or may not need to make minor patches to make the project
  build in Windows.  It appears that the Windows build is often broken; for
  example, I had to apply this patch:
  https://github.com/facebook/rocksdb/pull/1223/files
  - Phew, that should give you a build\java\Debug\rocksdbjni.dll.  So
  close to the summit... just a horrible hack left...
   3. Copy rocksdbjni.dll to librocksdbjni-win64.dll.
   4. Insert librocksdbjni-win64.dll into your rocksdbjni-4.8.0.jar.
  - Ugh, this is the horrible hack.  rocksdbjni seems to only look
  inside its own jar for its native libraries, so, this is where
it needs to
  be.
  - If you're a gradle user on Windows, you'd find this jar file
  in 
C:\Users\...your-windows-user...\.gradle\caches\modules-2\files-2.1\org.rocksdb\rocksdbjni\4.8.0\b543fc4ea5b52ad790730dee376ba0df06d9f5f7.

And there you go, almost a working Kafka Streams app in Windows.  One other
detail is that the default state storage directory doesn't seem to be
created on demand, so I had to mkdir C:\tmp\kafka-streams myself before my
app would work.

Mathieu


On Fri, Aug 5, 2016 at 3:13 AM, Eno Thereska  wrote:

> Hi Mathieu,
>
> It is true that the DSL currently does not support configuration of the
> stores.
>
> Sounds like it might be worth trying to build RocksDb and dropping into
> classpath for now.
>
> Eno
>
> > On 4 Aug 2016, at 17:42, Mathieu Fenniak 
> wrote:
> >
> > Hi Eno,
> >
> > Yes, I've looked at that.  RocksDB can be built and run in Windows, but,
> > the JNI wrapper does not include Windows binarie (
> > https://github.com/facebook/rocksdb/issues/703).  rocksdbjni-4.4.1.jar
> > includes librocksdbjni-linux32.so, librocksdbjni-linux64.so, and
> > librocksdbjni-osx.jnilib, so only supports Linux x86 & x64 and OS X.  It
> is
> > probably possible for me to build it myself and drop it in my classpath,
> > but, I'm looking for a lower friction approach if one exists. :-)
> >
> > It looks like this was discussed recently on the Confluent Platform
> mailing
> > list (https://groups.google.com/forum/#!topic/confluent-
> platform/Z1rsfSNrVJk)
> > and the conclusion there was that high-level streams DSL doesn't support
> > configuration of the stores.
> >
> > Mathieu
> >
> >
> > On Thu, Aug 4, 2016 at 10:28 AM, Eno Thereska 
> > wrote:
> >
> >> Hi Mathieu,
> >>
> >> Have you had a chance to look at http://rocksdb.org/blog/2033/
> >> rocksdb-is-now-available-in-windows-platform/? <
> >> http://rocksdb.org/blog/2033/rocksdb-is-now-available-in-
> windows-platform/?>
> >> Curious to hear your and other's comments on whether that worked.
> >>
> >> It is possible to have Kafka Streams use an in-memory store (included
> with
> >> Kafka Streams) for development purposes. In that scenario RocksDb would
> not
> >> be needed.
> >>
> >> Eno
> >>
> >>
> >>> On 4 Aug 2016, at 16:14, Mathieu Fenniak  >
> >> wrote:
> >>>
> >>> Hey all,
> >>>
> >>> Is it anyone developing Kafka Streams applications on Windows?
> >>>
> >>> It seems like the RocksDB Java library doesn't include a native JNI
> >> library
> >>> for Windows, which prevents a Kafka Streams app from running on
> >> Windows.  I
> >>> was just wonder

Re: Kafka Streams on Windows?

2016-08-04 Thread Mathieu Fenniak
Hi Eno,

Yes, I've looked at that.  RocksDB can be built and run in Windows, but,
the JNI wrapper does not include Windows binarie (
https://github.com/facebook/rocksdb/issues/703).  rocksdbjni-4.4.1.jar
includes librocksdbjni-linux32.so, librocksdbjni-linux64.so, and
librocksdbjni-osx.jnilib, so only supports Linux x86 & x64 and OS X.  It is
probably possible for me to build it myself and drop it in my classpath,
but, I'm looking for a lower friction approach if one exists. :-)

It looks like this was discussed recently on the Confluent Platform mailing
list (https://groups.google.com/forum/#!topic/confluent-platform/Z1rsfSNrVJk)
and the conclusion there was that high-level streams DSL doesn't support
configuration of the stores.

Mathieu


On Thu, Aug 4, 2016 at 10:28 AM, Eno Thereska 
wrote:

> Hi Mathieu,
>
> Have you had a chance to look at http://rocksdb.org/blog/2033/
> rocksdb-is-now-available-in-windows-platform/? <
> http://rocksdb.org/blog/2033/rocksdb-is-now-available-in-windows-platform/?>
> Curious to hear your and other's comments on whether that worked.
>
> It is possible to have Kafka Streams use an in-memory store (included with
> Kafka Streams) for development purposes. In that scenario RocksDb would not
> be needed.
>
> Eno
>
>
> > On 4 Aug 2016, at 16:14, Mathieu Fenniak 
> wrote:
> >
> > Hey all,
> >
> > Is it anyone developing Kafka Streams applications on Windows?
> >
> > It seems like the RocksDB Java library doesn't include a native JNI
> library
> > for Windows, which prevents a Kafka Streams app from running on
> Windows.  I
> > was just wondering if others have run into this, and if so, what approach
> > you took to resolve it.
> >
> > I'm favouring the idea of running my applications in a Vagrant VM to
> avoid
> > the issue.  It makes the Windows development environment a little less
> > pleasant, but, seems plausible.
> >
> > Other ideas that occurred to me:
> >
> >   - RocksDB does support Windows, but, there don't seem to be any
> binaries
> >   available or packaged for it or the jni library.  I could probably
> build
> >   these myself, but it sounds a little painful.
> >   - Not developing on Windows.  Works for me, but, won't work as well for
> >   my colleagues.
> >   - Is it possible to make Kafka Streams not use any local state storage
> >   in some kind of development mode?  Not sure...
> >
> > Thanks,
> >
> > Mathieu
>
>


Kafka Streams on Windows?

2016-08-04 Thread Mathieu Fenniak
Hey all,

Is it anyone developing Kafka Streams applications on Windows?

It seems like the RocksDB Java library doesn't include a native JNI library
for Windows, which prevents a Kafka Streams app from running on Windows.  I
was just wondering if others have run into this, and if so, what approach
you took to resolve it.

I'm favouring the idea of running my applications in a Vagrant VM to avoid
the issue.  It makes the Windows development environment a little less
pleasant, but, seems plausible.

Other ideas that occurred to me:

   - RocksDB does support Windows, but, there don't seem to be any binaries
   available or packaged for it or the jni library.  I could probably build
   these myself, but it sounds a little painful.
   - Not developing on Windows.  Works for me, but, won't work as well for
   my colleagues.
   - Is it possible to make Kafka Streams not use any local state storage
   in some kind of development mode?  Not sure...

Thanks,

Mathieu


Re: Kafka Streams/Connect for Persistence?

2016-07-22 Thread Mathieu Fenniak
Hm, cool.  Thanks Gwen and Guozhang.

Loose-coupling (especially with regard to the number of instances running),
batch inserts, and exactly-once are very convincing.  Dynamic schema is
interesting / scary, but, I'd need a dynamic app on the other side which I
don't have. :-)

I'll plod along with KS-foreach until the JDBC sink connector is available,
but, would definitely pick up the JDBC sink connector and give it a try
when available.

Thanks,

Mathieu


On Thu, Jul 21, 2016 at 7:07 PM, Gwen Shapira  wrote:

> In addition, our soon-to-be-released JDBC sink connector uses the
> Connect framework to do things that are kind of annoying to do
> yourself:
> * Convert data types
> * create tables if needed, add columns to tables if needed based on
> the data in Kafka
> * support for both insert and upsert
> * configurable batch inserts
> * exactly-once from Kafka to DB (using upserts)
>
> We'll notify you when we open the repository. Just a bit of cleanup left :)
>
>
> On Thu, Jul 21, 2016 at 1:45 PM, Guozhang Wang  wrote:
> > Hi Mathieu,
> >
> > I'm cc'ing Ewen for answering your question as well, but here are my two
> > cents:
> >
> > 1. One benefit of piping end result from KS to KC rather than using
> > .foreach() in KS directly is that you can have a loose coupling between
> > data processing and data copying. For example, for the latter approach
> the
> > number of JDBC connections is tied to the number of your KS instances,
> > while in practice you may want to have a different number.
> >
> > 2. We are working on end-to-end exactly-once semantics right now, which
> is
> > a big project involving both KS and KC. From the KS point of view, any
> > logic inside the foreach() call is a "black-box" to it and any
> side-effects
> > it may result in is not considered in its part of the exactly-once
> > semantics; whereas with KC it has full knowledge about the connector and
> > hence can achieve exactly-once as well for copying data to your RDBMS.
> >
> >
> > Guozhang
> >
> > On Thu, Jul 21, 2016 at 6:49 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> >> Hello again, Kafka users,
> >>
> >> My end goal is to get stream-processed data into a PostgreSQL database.
> >>
> >> I really like the architecture that Kafka Streams takes; it's "just" a
> >> library, I can build a normal Java application around it and deal with
> >> configuration and orchestration myself.  To persist my data, it's easy
> to
> >> add a .foreach() to the end of my topology and upsert data into my DB
> with
> >> jdbc.
> >>
> >> I'm interpreting based upon the docs that the recommended approach
> would be
> >> to send my final data back to a Kafka topic, and use Connect with a
> sink to
> >> persist that data.  That seems really interesting, but it's another
> complex
> >> moving part that I could do without.
> >>
> >> What advantages does Kafka Connect provide that I would be missing out
> on
> >> by persisting my data directly from my Kafka Streams application?
> >>
> >> Thanks,
> >>
> >> Mathieu
> >>
> >
> >
> >
> > --
> > -- Guozhang
>


Kafka Streams/Connect for Persistence?

2016-07-21 Thread Mathieu Fenniak
Hello again, Kafka users,

My end goal is to get stream-processed data into a PostgreSQL database.

I really like the architecture that Kafka Streams takes; it's "just" a
library, I can build a normal Java application around it and deal with
configuration and orchestration myself.  To persist my data, it's easy to
add a .foreach() to the end of my topology and upsert data into my DB with
jdbc.

I'm interpreting based upon the docs that the recommended approach would be
to send my final data back to a Kafka topic, and use Connect with a sink to
persist that data.  That seems really interesting, but it's another complex
moving part that I could do without.

What advantages does Kafka Connect provide that I would be missing out on
by persisting my data directly from my Kafka Streams application?

Thanks,

Mathieu


Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
I'm using the 0.10.0.0 release.

Matthias's suggestion of using .toStream().filter(...).foreach(...) does
prevents the nulls from reaching the foreach.  But
.filter(...).foreach(...) does not; the filter's predicate is not even
executed before the ForeachAction receives the null records.



On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax 
wrote:

> Try to do a
>
> .toStream().filter(...).foreach(...)
>
>
> -Matthias
>
>
> On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > Are you using the 0.10.0.0 release or from trunk?
> >
> > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Yes, I tried to apply the filter on the KTable that came from join, and
> >> then the foreach on the KTable that came from filter.  I was still
> getting
> >> the nulls through to my foreach.
> >>
> >> It is easy to workaround, but, the behaviour was especially surprising
> when
> >> the filter didn't prevent it.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang 
> >> wrote:
> >>
> >>> Hi Mathieu,
> >>>
> >>> As Matthias said, we are working on improving the current join
> semantics:
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >>>
> >>> and will keep you updated.
> >>>
> >>>
> >>> As for KTable.filter(), I think it can actually achieve want you want:
> >> not
> >>> forwarding nulls to the downstream operators; have you tried it out but
> >>> find it is not working?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> >>> mathieu.fenn...@replicon.com> wrote:
> >>>
> >>>> Hm... OK, I think that makes sense.
> >>>>
> >>>> It seems like I can't filter out those tombstone records; is that
> >>> expected
> >>>> as well?  If I throw in a .filter operation before my foreach, its
> >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
> >>> with a
> >>>> null value still.
> >>>>
> >>>> Mathieu
> >>>>
> >>>>
> >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Hi Mathieu,
> >>>>>
> >>>>> join semantics are tricky. We are still working on a better
> >>>>> documentation for it...
> >>>>>
> >>>>> For the current state and your question:
> >>>>>
> >>>>> Each time a record is processed, it looks up the other KTable to see
> >> if
> >>>>> there is a matching record. If non is found, the join result is empty
> >>>>> and a tombstone record with  is sent downstream. This
> >>> happens,
> >>>>> to delete any (possible existing) previous join result for this key
> >> --
> >>>>> keep in mind, that the result is a KTable containing the current
> >> state
> >>>>> of the join.
> >>>>>
> >>>>> This happens both ways, thus, if your first records of each stream do
> >>>>> not match on the key, both result in a  message to delete
> >>>>> possible existing join-tuples in the result KTable.
> >>>>>
> >>>>> Does this make sense to you?
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> >>>>>> Hello Kafka users,
> >>>>>>
> >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
> >>> was
> >>>>>> hoping someone could explain them to me.  I have two streams, which
> >>>> I've
> >>>>>> converted KStream->KTable, and then I am joining them together
> >> with a
> >>>>>> "join" (not an outer join, not a full join).  With the resulting
> >>> KTable
> >>>>>> from the join, I am performing a foreach.
> >>>>>>
> &

Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
Hi Guozhang,

Yes, I tried to apply the filter on the KTable that came from join, and
then the foreach on the KTable that came from filter.  I was still getting
the nulls through to my foreach.

It is easy to workaround, but, the behaviour was especially surprising when
the filter didn't prevent it.

Mathieu


On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> As Matthias said, we are working on improving the current join semantics:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>
> and will keep you updated.
>
>
> As for KTable.filter(), I think it can actually achieve want you want: not
> forwarding nulls to the downstream operators; have you tried it out but
> find it is not working?
>
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hm... OK, I think that makes sense.
> >
> > It seems like I can't filter out those tombstone records; is that
> expected
> > as well?  If I throw in a .filter operation before my foreach, its
> > Predicate is not invoked, and the foreach's ForeachAction is invoked
> with a
> > null value still.
> >
> > Mathieu
> >
> >
> > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax 
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > join semantics are tricky. We are still working on a better
> > > documentation for it...
> > >
> > > For the current state and your question:
> > >
> > > Each time a record is processed, it looks up the other KTable to see if
> > > there is a matching record. If non is found, the join result is empty
> > > and a tombstone record with  is sent downstream. This
> happens,
> > > to delete any (possible existing) previous join result for this key --
> > > keep in mind, that the result is a KTable containing the current state
> > > of the join.
> > >
> > > This happens both ways, thus, if your first records of each stream do
> > > not match on the key, both result in a  message to delete
> > > possible existing join-tuples in the result KTable.
> > >
> > > Does this make sense to you?
> > >
> > > -Matthias
> > >
> > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > Hello Kafka users,
> > > >
> > > > I'm seeing some unexpected results when using Kafka Streams, and I
> was
> > > > hoping someone could explain them to me.  I have two streams, which
> > I've
> > > > converted KStream->KTable, and then I am joining them together with a
> > > > "join" (not an outer join, not a full join).  With the resulting
> KTable
> > > > from the join, I am performing a foreach.
> > > >
> > > > When I startup my Streams application, my foreach receives two
> records
> > > with
> > > > valid keys but null values *before* my ValueJoiner ever gets
> executed.
> > > Why
> > > > would that be?
> > > >
> > > > Code excerpt; please excuse the use of Kotlin here:
> > > >
> > > > val builder = KStreamBuilder()
> > > >
> > > > val approvalStatus = builder.table(
> > > > Serdes.String(),
> > > > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > > "TimesheetApprovalStatusChanged"
> > > > )
> > > >
> > > > val timesheetLastApprovalAction = builder.table(
> > > > Serdes.String(),
> > > > JsonSerde(Map::class.java),
> > > > "TimesheetApprovalActionPerformed"
> > > > )
> > > >
> > > > val timesheetStatus =
> approvalStatus.join(timesheetLastApprovalAction,
> > {
> > > > approvalStatus, lastApprovalAction ->
> > > > println("EXECUTING ValueJoiner")
> > > > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > lastApprovalAction
> > > > as Map)
> > > > })
> > > >
> > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > > println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > $timesheetStatus")
> > > > if (timesheetStatus == null) {
> > > > println("SKIPPING NULL I DON'T UNDERSTAND")
> > > > }
> > > > })
> > > >
> > > >
> > > > Resulting console output:
> > > >
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ValueJoiner
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: urn:replicon:timesheet-status:submitting
> > > >
> > > >
> > > > Any explanation on why the foreach would be executing for data that
> > > hasn't
> > > > been generated by my join?
> > > >
> > > > Thanks,
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
Hm... OK, I think that makes sense.

It seems like I can't filter out those tombstone records; is that expected
as well?  If I throw in a .filter operation before my foreach, its
Predicate is not invoked, and the foreach's ForeachAction is invoked with a
null value still.

Mathieu


On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax 
wrote:

> Hi Mathieu,
>
> join semantics are tricky. We are still working on a better
> documentation for it...
>
> For the current state and your question:
>
> Each time a record is processed, it looks up the other KTable to see if
> there is a matching record. If non is found, the join result is empty
> and a tombstone record with  is sent downstream. This happens,
> to delete any (possible existing) previous join result for this key --
> keep in mind, that the result is a KTable containing the current state
> of the join.
>
> This happens both ways, thus, if your first records of each stream do
> not match on the key, both result in a  message to delete
> possible existing join-tuples in the result KTable.
>
> Does this make sense to you?
>
> -Matthias
>
> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > Hello Kafka users,
> >
> > I'm seeing some unexpected results when using Kafka Streams, and I was
> > hoping someone could explain them to me.  I have two streams, which I've
> > converted KStream->KTable, and then I am joining them together with a
> > "join" (not an outer join, not a full join).  With the resulting KTable
> > from the join, I am performing a foreach.
> >
> > When I startup my Streams application, my foreach receives two records
> with
> > valid keys but null values *before* my ValueJoiner ever gets executed.
> Why
> > would that be?
> >
> > Code excerpt; please excuse the use of Kotlin here:
> >
> > val builder = KStreamBuilder()
> >
> > val approvalStatus = builder.table(
> > Serdes.String(),
> > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > "TimesheetApprovalStatusChanged"
> > )
> >
> > val timesheetLastApprovalAction = builder.table(
> > Serdes.String(),
> > JsonSerde(Map::class.java),
> > "TimesheetApprovalActionPerformed"
> > )
> >
> > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> > approvalStatus, lastApprovalAction ->
> > println("EXECUTING ValueJoiner")
> > computeTimesheetStatus(approvalStatus.approvalStatus!!,
> lastApprovalAction
> > as Map)
> > })
> >
> > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > println("EXECUTING ForeachAction: $timesheetKey, status:
> > $timesheetStatus")
> > if (timesheetStatus == null) {
> > println("SKIPPING NULL I DON'T UNDERSTAND")
> > }
> > })
> >
> >
> > Resulting console output:
> >
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ValueJoiner
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: urn:replicon:timesheet-status:submitting
> >
> >
> > Any explanation on why the foreach would be executing for data that
> hasn't
> > been generated by my join?
> >
> > Thanks,
> >
> > Mathieu
> >
>
>


Kafka Streams: KTable join + filter + foreach

2016-07-20 Thread Mathieu Fenniak
Hello Kafka users,

I'm seeing some unexpected results when using Kafka Streams, and I was
hoping someone could explain them to me.  I have two streams, which I've
converted KStream->KTable, and then I am joining them together with a
"join" (not an outer join, not a full join).  With the resulting KTable
from the join, I am performing a foreach.

When I startup my Streams application, my foreach receives two records with
valid keys but null values *before* my ValueJoiner ever gets executed.  Why
would that be?

Code excerpt; please excuse the use of Kotlin here:

val builder = KStreamBuilder()

val approvalStatus = builder.table(
Serdes.String(),
JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
"TimesheetApprovalStatusChanged"
)

val timesheetLastApprovalAction = builder.table(
Serdes.String(),
JsonSerde(Map::class.java),
"TimesheetApprovalActionPerformed"
)

val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
approvalStatus, lastApprovalAction ->
println("EXECUTING ValueJoiner")
computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
as Map)
})

timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
println("EXECUTING ForeachAction: $timesheetKey, status:
$timesheetStatus")
if (timesheetStatus == null) {
println("SKIPPING NULL I DON'T UNDERSTAND")
}
})


Resulting console output:

EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ValueJoiner
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: urn:replicon:timesheet-status:submitting


Any explanation on why the foreach would be executing for data that hasn't
been generated by my join?

Thanks,

Mathieu


Re: kafka-streams depends upon slf4j-log4j12

2016-07-19 Thread Mathieu Fenniak
Thanks for the confirmation Guozhang.  I've submitted a PR along these
lines.  https://github.com/apache/kafka/pull/1639


On Tue, Jul 19, 2016 at 3:50 PM, Guozhang Wang  wrote:

> This is a good find. I think we should just include the api as compile
> dependency, and probably only log4j12 as test dependency. Similarly to
> Kafka Clients and Connect:
>
> 
> org.slf4j
> slf4j-log4j12
> 1.7.21
> test
> 
> 
> org.slf4j
> slf4j-api
> 1.7.21
> compile
> 
>
>
> Guozhang
>
>
> On Tue, Jul 19, 2016 at 10:39 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hello Kafka users,
> >
> > I'm starting a new project and experimenting with kafka-streams.  It's
> > pretty great, so far; thanks to everyone involved in the development.
> >
> > I noticed that kafka-streams 0.10.0.0 has a dependency on slf4j-log4j12
> >  (see:
> >
> >
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/0.10.0.0/kafka-streams-0.10.0.0.pom
> > ).
> > This doesn't seem correct to me based upon my limited knowledge of slf4j,
> > as it implies that the consuming application will be using log4j rather
> > than another slf4j supported logging framework.
> >
> > I've been able to use another logging framework (logback) by excluding
> this
> > dependency in my build.gradle, as below, but I wanted to ask if this
> > dependency is a mistake, or intentional?
> >
> >   compile(group: 'org.apache.kafka', name: 'kafka-streams', version:
> > '0.10.0.0') {
> > exclude module: 'slf4j-log4j12'
> >   }
> >
> > Thanks,
> >
> > Mathieu Fenniak
> >
>
>
>
> --
> -- Guozhang
>


kafka-streams depends upon slf4j-log4j12

2016-07-19 Thread Mathieu Fenniak
Hello Kafka users,

I'm starting a new project and experimenting with kafka-streams.  It's
pretty great, so far; thanks to everyone involved in the development.

I noticed that kafka-streams 0.10.0.0 has a dependency on slf4j-log4j12
 (see:
https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/0.10.0.0/kafka-streams-0.10.0.0.pom).
This doesn't seem correct to me based upon my limited knowledge of slf4j,
as it implies that the consuming application will be using log4j rather
than another slf4j supported logging framework.

I've been able to use another logging framework (logback) by excluding this
dependency in my build.gradle, as below, but I wanted to ask if this
dependency is a mistake, or intentional?

  compile(group: 'org.apache.kafka', name: 'kafka-streams', version:
'0.10.0.0') {
exclude module: 'slf4j-log4j12'
  }

Thanks,

Mathieu Fenniak