Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Jeff Klukas
>From what I can tell, global state stores are managed separately from other
state stores and are accessed via different methods.

Do the proposed methods on TopologyTestDriver (such as getStateStore) cover
global stores? If not, can we add an interface for accessing and testing
global stores in the scope of this KIP?

On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax 
wrote:

> Dear Kafka community,
>
> I want to propose KIP-247 to add public test utils to the Streams API.
> The goal is to simplify testing of Kafka Streams applications.
>
> Please find details in the wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 247%3A+Add+public+test+utils+for+Kafka+Streams
>
> This is an initial KIP, and we hope to add more utility functions later.
> Thus, this KIP is not comprehensive but a first step. Of course, we can
> enrich this initial KIP if we think it falls too short. But we should
> not aim to be comprehensive to keep the scope manageable.
>
> In fact, I think we should add some more helpers to simplify result
> verification. I will update the KIP with this asap. Just wanted to start
> the discussion early on.
>
> An initial WIP PR can be found here:
> https://github.com/apache/kafka/pull/4402
>
> I also included the user-list (please hit "reply-all" to include both
> lists in this KIP discussion).
>
> Thanks a lot.
>
>
> -Matthias
>
>
>


Metrics for Kafka Connect

2017-09-12 Thread Jeff Klukas
The Kafka docs on Monitoring don't mention anything specific for Kafka
Connect. Are metrics for connectors limited to just the standard
consumer/producer metrics? Do I understand correctly that the Connect API
doesn't provide any hooks for custom connector-specific metrics? If not, is
that something that's on the roadmap at all?


Documentation for 0.10.1.1

2017-01-10 Thread Jeff Klukas
I'm starting to look at upgrading to 0.10.1.1, but looks like the docs have
not been updated since 0.10.1.0.

Are there any plans to update the docs to explicitly discuss how to upgrade
from 0.10.1.0 -> 0.10.1.1, and 0.10.0.X -> 0.10.1.1?


Kafka Connect startup issues

2016-09-12 Thread Jeff Klukas
I'm doing some local testing on my Mac to get a feel for Kafka Connect, and
I'm running into several issues.

First, when I untar the Kafka 0.10.0.1 source and run
`./bin/connect-distributed.sh config/connect-distributed.properties`, I get
a "Usage" message. By digging through scripts a bit, I found that the
CLASSPATH variable ends up being empty, so it's interpreting the properties
file as the classpath and thus thinks no properties file was passed.

In order to get past that hurdle, I had to find the compiled
`connect-runtime` jar and specifically set the classpath to look at that
jar. Is it expected that the connect-distributed.sh script should be usable
directly from the Kafka source? Am I missing documentation about setup?

Second, I'm finding that I can't get the Kafka Connect process to shut down
cleanly. The DistributedHerder always hangs on shutdown. When I hit Ctrl-C,
I see the following log messages:

INFO  [2016-09-10 01:11:40,430]
org.apache.kafka.connect.runtime.Connect: Kafka Connect stopping
INFO  [2016-09-10 01:11:40,430]
org.apache.kafka.connect.runtime.rest.RestServer: Stopping REST server
INFO  [2016-09-10 01:11:40,443]
org.apache.kafka.connect.runtime.rest.RestServer: REST server stopped
INFO  [2016-09-10 01:11:40,443]
org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder
stopping

But the "Herder stopped" message never comes and I can only stop the
process by calling `kill -9` on it. Any thoughts on what could cause this?


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Jeff Klukas
On Thu, Jun 16, 2016 at 5:20 PM, Ismael Juma <ism...@juma.me.uk> wrote:

> On Thu, Jun 16, 2016 at 11:13 PM, Stephen Boesch <java...@gmail.com>
> wrote:
>
> > @Jeff Klukas What is the concern about scala 2.11 vs 2.12?   2.11 runs on
> > both java7 and java8
> >
>
> Scala 2.10.5 and 2.10.6 also support Java 8 for what it's worth.
>

I was under the impression that Scala 2.12 would be the first version
compatible with Java 8 bytecode, but looks like that was a misunderstanding
on my part.

+1


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-16 Thread Jeff Klukas
Would the move to Java 8 be for all modules? I'd have some concern about
removing Java 7 compatibility for kafka-clients and for kafka streams
(though less so since it's still so new). I don't know how hard it will be
to transition a Scala 2.11 application to Scala 2.12. Are we comfortable
with the idea of applications stuck on Scala 2.11 or otherwise unable to
update to Java 8 not having access to new client releases?

On Thu, Jun 16, 2016 at 5:05 PM, Philippe Derome  wrote:

> I strongly support motion having difficulty running (Apache Kafka as
> opposed to Confluent) Stream examples with JDK 8 today.
> On 16 Jun 2016 4:46 p.m., "Ismael Juma"  wrote:
>
> > Hi all,
> >
> > I would like to start a discussion on making Java 8 a minimum requirement
> > for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
> > is the first discussion on the topic so the idea is to understand how
> > people feel about it. If people feel it's too soon, then we can pick up
> the
> > conversation again after Kafka 0.10.1.0. If the feedback is mostly
> > positive, I will start a vote thread.
> >
> > Let's start with some dates. Java 7 hasn't received public updates since
> > April 2015[1], Java 8 was released in March 2014[2] and Java 9 is
> scheduled
> > to be released in March 2017[3].
> >
> > The first argument for dropping support for Java 7 is that the last
> public
> > release by Oracle contains a large number of known security
> > vulnerabilities. The effectiveness of Kafka's security features is
> reduced
> > if the underlying runtime is not itself secure.
> >
> > The second argument for moving to Java 8 is that it adds a number of
> > compelling features:
> >
> > * Lambda expressions and method references (particularly useful for the
> > Kafka Streams DSL)
> > * Default methods (very useful for maintaining compatibility when adding
> > methods to interfaces)
> > * java.util.stream (helpful for making collection transformations more
> > concise)
> > * Lots of improvements to java.util.concurrent (CompletableFuture,
> > DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
> > * Other nice things: SplittableRandom, Optional (and many others I have
> not
> > mentioned)
> >
> > The third argument is that it will simplify our testing matrix, we won't
> > have to test with Java 7 any longer (this is particularly useful for
> system
> > tests that take hours to run). It will also make it easier to support
> Scala
> > 2.12, which requires Java 8.
> >
> > The fourth argument is that many other open-source projects have taken
> the
> > leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
> > Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
> > support Java 8 in the next version (although it will take a while before
> > most phones will use that version sadly). This reduces (but does not
> > eliminate) the chance that we would be the first project that would
> cause a
> > user to consider a Java upgrade.
> >
> > The main argument for not making the change is that a reasonable number
> of
> > users may still be using Java 7 by the time Kafka 0.10.1.0 is released.
> > More specifically, we care about the subset who would be able to upgrade
> to
> > Kafka 0.10.1.0, but would not be able to upgrade the Java version. It
> would
> > be great if we could quantify this in some way.
> >
> > What do you think?
> >
> > Ismael
> >
> > [1] https://java.com/en/download/faq/java_7.xml
> > [2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released
> > [3] http://openjdk.java.net/projects/jdk9/
> > [4] https://github.com/apache/cassandra/blob/trunk/README.asc
> > [5] https://lucene.apache.org/#highlights-of-this-lucene-release-include
> > [6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html
> > [7] https://issues.apache.org/jira/browse/HADOOP-11858
> > [8] https://webtide.com/jetty-9-3-features/
> > [9] http://markmail.org/message/l7s276y3xkga2eqf
> > [10]
> >
> >
> https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under
> > [11] http://markmail.org/message/l7s276y3xkga2eqf
> >
>


Re: Handling of nulls in KTable groupBy

2016-06-08 Thread Jeff Klukas
Doing some more investigation into this, the KTable-KTable inner join is
indeed emitting records on every update of either KTable. If their is no
match found, the record that's emitted is null. This may be a conscious
design decision due to the continuous nature of the join, although I'd love
to hear confirmation or commentary on that.

Assuming the above is true, I think a KTable-KTable join followed by a
groupBy is simply not possible.

I discovered a different approach which seems roundabout, but appears to
work. I can convert the joined KTable to a KStream, filter out null values,
and then use a KStream.map operation to change the key (rather than
repartitioning via groupBy). Finally, reduceByKey can get us back to a
KTable:

table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v
== null).map( ... ).reduceByKey( ... )

Is it necessary to convert to a KStream in order to filter out the null
join values?

-- Forwarded message --
From: Jeff Klukas <jklu...@simple.com>
To: users@kafka.apache.org
Cc: Guozhang Wang <wangg...@gmail.com>
Date: Wed, 8 Jun 2016 10:56:26 -0400
Subject: Handling of nulls in KTable groupBy
I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
(Integer accountId, String userId) -> {
return new KeyValue<>(accountId, userId);
})
.groupBy((Integer customerId, KeyValue<Integer, String> kv)
-> {
return kv;
}, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?


Handling of nulls in KTable groupBy

2016-06-08 Thread Jeff Klukas
I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
(Integer accountId, String userId) -> {
return new KeyValue<>(accountId, userId);
})
.groupBy((Integer customerId, KeyValue kv)
-> {
return kv;
}, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?


Expose Kafka Streams test fixtures as a kafka-streams-test package?

2016-04-25 Thread Jeff Klukas
As I'm developing a Kafka Streams application, I ended up copying the
content of streams/src/test/java/org/apache/kafka/test/ into my project in
order to use the KStreamTestDriver and associated functionality in tests,
which is working really well.

Would the Kafka team to open to refactoring these fixtures to be a separate
package, perhaps moving the source into a new
streams/test-fixtures/src/main and adding appropriate configuration to the
gradle build to pull in that package as a test dependency where needed and
to publish it as a separate artifact?

If there's interest in this, I would submit a PR.


How to "buffer" a stream with high churn and output only at the end of a window?

2016-04-19 Thread Jeff Klukas
Is it true that the aggregation and reduction methods of KStream will emit
a new output message for each incoming message?

I have an application that's copying a Postgres replication stream to a
Kafka topic, and activity tends to be clustered, with many updates to a
given primary key happening in quick succession. I'd like to smooth that
out by buffering the messages in tumbling windows, allowing the updates to
overwrite one another, and emitting output messages only at the end of the
window.

Does the Kafka Streams API provide any hooks that I could use to achieve
this kind of windowed "buffering" or "deduplication" of a stream?


How to explicitly apply TimestampExtractor?

2016-04-15 Thread Jeff Klukas
The only hook I see for specifying a TimestampExtractor is in the
Properties that you pass when creating a KafkaStreams instance. Is it
possible to modify the timestamp while processing a stream, or does the
timestamp need to be extracted immediately upon entry into the topology?

I have a case where I'm creating a KStream from a topic with mostly
JSON-formatted messages. I need to deserialize as byte array, filter out
non-JSON messages, call .map on the stream to deserialize those objects
into desired POJOs, and only then reach into the objects to extract the
desired timestamp.

Workarounds I've imagined are either to define a TimestampExtractor that
attempts to do some partial deserialization of the payload to get at the
timestamp field; or, to create two separate topologies, with the second one
reading a topic that's already filtered.


kafka client and streams jars for RC builds

2016-04-07 Thread Jeff Klukas
I'm developing a Kafka Streams application and trying to keep up with the
evolution of the API as 0.10.0.0 release candidates come out. We've got a
test cluster running RC0, but it doesn't look like client or streams jars
for the RCs are available on maven central.

Are there any plans to upload jars for the RCs to a maven repo? Or will I
have to build them myself as we develop?


Re: KStream-KTable join with the KTable given a "head start"

2016-03-30 Thread Jeff Klukas
-- Forwarded message --
> From: Jeff Klukas <jklu...@simple.com>
> To: users@kafka.apache.org
> Cc:
> Date: Wed, 30 Mar 2016 11:14:53 -0400
> Subject: KStream-KTable join with the KTable given a "head start"
> I have a KStream that I want to enrich with some values from a lookup
> table. When a new key enters the KStream, there's likely to be a
> corresponding entry arriving on the KStream at the same time, so we end up
> with a race condition. If the KTable record arrives first, then its value
> is available for joining when the corresponding arrives on the KStream.  If
> the KStream record arrives first, however, we'll get a null join even if
> the KTable gets the corresponding record only milliseconds later.
>
> I'd like to give the KTable a "head start" of ~10 seconds, so that it gets
> a chance to get updated before the corresponding records arrive on the
> KStream. Could I achieve this using one of the existing Windowing
> strategies?
>
>
> -- Forwarded message --
> From: Guozhang Wang <wangg...@gmail.com>
> To: "users@kafka.apache.org" <users@kafka.apache.org>
> Cc:
> Date: Wed, 30 Mar 2016 13:51:03 -0700
> Subject: Re: KStream-KTable join with the KTable given a "head start"
> Hi Jeff,
>
> This is a common case of stream-table join, in that the joining results
> depending on the arrival ordering from these two sources.
>
> In Kafka Streams you can try to "synchronize" multiple input streams
> through the "TimestampExtractor" interface, which is used to assign a
> timestamp to each record polled from Kafka to start the processing. You
> can, for example, set the timestamps for your KStream within a later time
> interval and the timestamps for your KTable stream with an earlier time
> interval, so that the records from table are likely to be processed first.
> Note that this is an best effort, in that we cannot guarantee global
> ordering across streams while processing, that if you have a much later
> record coming from KTable then it will not block earlier records from
> KStream from being processed first. But we think this mechanism should be
> sufficient in practice.
>
> Let me know if it fits with your scenario, and if not we can talk about how
> it can be possibly improved.
>


What would I pass in for a window in this case? Or would I not pass in a
window?

I don't want to put any lower limit on the KTable timestamps I'd be willing
to join on (the corresponding entry in the KTable could have been from
weeks ago, or it could be fired right at the same time as the KStream
event).

Could I use JoinWindows.before() and pass in an arbitrarily long interval?


KStream-KTable join with the KTable given a "head start"

2016-03-30 Thread Jeff Klukas
I have a KStream that I want to enrich with some values from a lookup
table. When a new key enters the KStream, there's likely to be a
corresponding entry arriving on the KStream at the same time, so we end up
with a race condition. If the KTable record arrives first, then its value
is available for joining when the corresponding arrives on the KStream.  If
the KStream record arrives first, however, we'll get a null join even if
the KTable gets the corresponding record only milliseconds later.

I'd like to give the KTable a "head start" of ~10 seconds, so that it gets
a chance to get updated before the corresponding records arrive on the
KStream. Could I achieve this using one of the existing Windowing
strategies?


Lookup table common to all threads in a Kafka Streams app

2016-03-19 Thread Jeff Klukas
I'm experimenting with the Kafka Streams preview and understand that joins
can only happen between KStreams and/or KTables that are co-partitioned.
This is a reasonable limitation necessary to support large streams.

What if I have a small topic, though, that I'd like to be able to join
based on values in a stream's messages rather than the partition key? Could
there be a concept of a fully replicated KTable where every thread in my
Kafka Streams application would read a full copy into memory to be
available for joins without the restriction on shared keys?

I could probably achieve the effect I want by implementing a consumer in a
separate thread to read the topic into RocksDB. I would then do lookups
from that separate RocksDB instance in "map" operations within my Kafka
Streams application.

Is there an easier alternative that I'm missing? It would nice to have a
standard mechanism for maintaining small topics like these and making them
available for joins without key restrictions.