Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
>From what I can tell, global state stores are managed separately from other state stores and are accessed via different methods. Do the proposed methods on TopologyTestDriver (such as getStateStore) cover global stores? If not, can we add an interface for accessing and testing global stores in the scope of this KIP? On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Saxwrote: > Dear Kafka community, > > I want to propose KIP-247 to add public test utils to the Streams API. > The goal is to simplify testing of Kafka Streams applications. > > Please find details in the wiki: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > This is an initial KIP, and we hope to add more utility functions later. > Thus, this KIP is not comprehensive but a first step. Of course, we can > enrich this initial KIP if we think it falls too short. But we should > not aim to be comprehensive to keep the scope manageable. > > In fact, I think we should add some more helpers to simplify result > verification. I will update the KIP with this asap. Just wanted to start > the discussion early on. > > An initial WIP PR can be found here: > https://github.com/apache/kafka/pull/4402 > > I also included the user-list (please hit "reply-all" to include both > lists in this KIP discussion). > > Thanks a lot. > > > -Matthias > > >
Metrics for Kafka Connect
The Kafka docs on Monitoring don't mention anything specific for Kafka Connect. Are metrics for connectors limited to just the standard consumer/producer metrics? Do I understand correctly that the Connect API doesn't provide any hooks for custom connector-specific metrics? If not, is that something that's on the roadmap at all?
Documentation for 0.10.1.1
I'm starting to look at upgrading to 0.10.1.1, but looks like the docs have not been updated since 0.10.1.0. Are there any plans to update the docs to explicitly discuss how to upgrade from 0.10.1.0 -> 0.10.1.1, and 0.10.0.X -> 0.10.1.1?
Kafka Connect startup issues
I'm doing some local testing on my Mac to get a feel for Kafka Connect, and I'm running into several issues. First, when I untar the Kafka 0.10.0.1 source and run `./bin/connect-distributed.sh config/connect-distributed.properties`, I get a "Usage" message. By digging through scripts a bit, I found that the CLASSPATH variable ends up being empty, so it's interpreting the properties file as the classpath and thus thinks no properties file was passed. In order to get past that hurdle, I had to find the compiled `connect-runtime` jar and specifically set the classpath to look at that jar. Is it expected that the connect-distributed.sh script should be usable directly from the Kafka source? Am I missing documentation about setup? Second, I'm finding that I can't get the Kafka Connect process to shut down cleanly. The DistributedHerder always hangs on shutdown. When I hit Ctrl-C, I see the following log messages: INFO [2016-09-10 01:11:40,430] org.apache.kafka.connect.runtime.Connect: Kafka Connect stopping INFO [2016-09-10 01:11:40,430] org.apache.kafka.connect.runtime.rest.RestServer: Stopping REST server INFO [2016-09-10 01:11:40,443] org.apache.kafka.connect.runtime.rest.RestServer: REST server stopped INFO [2016-09-10 01:11:40,443] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder stopping But the "Herder stopped" message never comes and I can only stop the process by calling `kill -9` on it. Any thoughts on what could cause this?
Re: [DISCUSS] Java 8 as a minimum requirement
On Thu, Jun 16, 2016 at 5:20 PM, Ismael Juma <ism...@juma.me.uk> wrote: > On Thu, Jun 16, 2016 at 11:13 PM, Stephen Boesch <java...@gmail.com> > wrote: > > > @Jeff Klukas What is the concern about scala 2.11 vs 2.12? 2.11 runs on > > both java7 and java8 > > > > Scala 2.10.5 and 2.10.6 also support Java 8 for what it's worth. > I was under the impression that Scala 2.12 would be the first version compatible with Java 8 bytecode, but looks like that was a misunderstanding on my part. +1
Re: [DISCUSS] Java 8 as a minimum requirement
Would the move to Java 8 be for all modules? I'd have some concern about removing Java 7 compatibility for kafka-clients and for kafka streams (though less so since it's still so new). I don't know how hard it will be to transition a Scala 2.11 application to Scala 2.12. Are we comfortable with the idea of applications stuck on Scala 2.11 or otherwise unable to update to Java 8 not having access to new client releases? On Thu, Jun 16, 2016 at 5:05 PM, Philippe Deromewrote: > I strongly support motion having difficulty running (Apache Kafka as > opposed to Confluent) Stream examples with JDK 8 today. > On 16 Jun 2016 4:46 p.m., "Ismael Juma" wrote: > > > Hi all, > > > > I would like to start a discussion on making Java 8 a minimum requirement > > for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This > > is the first discussion on the topic so the idea is to understand how > > people feel about it. If people feel it's too soon, then we can pick up > the > > conversation again after Kafka 0.10.1.0. If the feedback is mostly > > positive, I will start a vote thread. > > > > Let's start with some dates. Java 7 hasn't received public updates since > > April 2015[1], Java 8 was released in March 2014[2] and Java 9 is > scheduled > > to be released in March 2017[3]. > > > > The first argument for dropping support for Java 7 is that the last > public > > release by Oracle contains a large number of known security > > vulnerabilities. The effectiveness of Kafka's security features is > reduced > > if the underlying runtime is not itself secure. > > > > The second argument for moving to Java 8 is that it adds a number of > > compelling features: > > > > * Lambda expressions and method references (particularly useful for the > > Kafka Streams DSL) > > * Default methods (very useful for maintaining compatibility when adding > > methods to interfaces) > > * java.util.stream (helpful for making collection transformations more > > concise) > > * Lots of improvements to java.util.concurrent (CompletableFuture, > > DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator) > > * Other nice things: SplittableRandom, Optional (and many others I have > not > > mentioned) > > > > The third argument is that it will simplify our testing matrix, we won't > > have to test with Java 7 any longer (this is particularly useful for > system > > tests that take hours to run). It will also make it easier to support > Scala > > 2.12, which requires Java 8. > > > > The fourth argument is that many other open-source projects have taken > the > > leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7], > > Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will > > support Java 8 in the next version (although it will take a while before > > most phones will use that version sadly). This reduces (but does not > > eliminate) the chance that we would be the first project that would > cause a > > user to consider a Java upgrade. > > > > The main argument for not making the change is that a reasonable number > of > > users may still be using Java 7 by the time Kafka 0.10.1.0 is released. > > More specifically, we care about the subset who would be able to upgrade > to > > Kafka 0.10.1.0, but would not be able to upgrade the Java version. It > would > > be great if we could quantify this in some way. > > > > What do you think? > > > > Ismael > > > > [1] https://java.com/en/download/faq/java_7.xml > > [2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released > > [3] http://openjdk.java.net/projects/jdk9/ > > [4] https://github.com/apache/cassandra/blob/trunk/README.asc > > [5] https://lucene.apache.org/#highlights-of-this-lucene-release-include > > [6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html > > [7] https://issues.apache.org/jira/browse/HADOOP-11858 > > [8] https://webtide.com/jetty-9-3-features/ > > [9] http://markmail.org/message/l7s276y3xkga2eqf > > [10] > > > > > https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under > > [11] http://markmail.org/message/l7s276y3xkga2eqf > > >
Re: Handling of nulls in KTable groupBy
Doing some more investigation into this, the KTable-KTable inner join is indeed emitting records on every update of either KTable. If their is no match found, the record that's emitted is null. This may be a conscious design decision due to the continuous nature of the join, although I'd love to hear confirmation or commentary on that. Assuming the above is true, I think a KTable-KTable join followed by a groupBy is simply not possible. I discovered a different approach which seems roundabout, but appears to work. I can convert the joined KTable to a KStream, filter out null values, and then use a KStream.map operation to change the key (rather than repartitioning via groupBy). Finally, reduceByKey can get us back to a KTable: table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v == null).map( ... ).reduceByKey( ... ) Is it necessary to convert to a KStream in order to filter out the null join values? -- Forwarded message -- From: Jeff Klukas <jklu...@simple.com> To: users@kafka.apache.org Cc: Guozhang Wang <wangg...@gmail.com> Date: Wed, 8 Jun 2016 10:56:26 -0400 Subject: Handling of nulls in KTable groupBy I have a seemingly simple case where I want to join two KTables to produce a new table with a different key, but I am getting NPEs. My understanding is that to change the key of a KTable, I need to do a groupBy and a reduce. What I believe is going on is that the inner join operation is emitting nulls in the case that no matching record is found in one of the source KTables. The groupBy operation then receives null inputs that it's not expecting. Here is the snippet of code where I define the join and the groupBy: customerIdToAccountIdLookup.join(customerIdToUserIdLookup, (Integer accountId, String userId) -> { return new KeyValue<>(accountId, userId); }) .groupBy((Integer customerId, KeyValue<Integer, String> kv) -> { return kv; }, Serdes.Integer(), Serdes.String()) This produces the following exception: ! java.lang.NullPointerException: null ! at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$ KTableMapProcessor.process(KTableRepartitionMap.java:88) Am I approaching this incorrectly, or is there a bug going on? Should a KTable-KTable inner join be emitting records when no match is found?
Handling of nulls in KTable groupBy
I have a seemingly simple case where I want to join two KTables to produce a new table with a different key, but I am getting NPEs. My understanding is that to change the key of a KTable, I need to do a groupBy and a reduce. What I believe is going on is that the inner join operation is emitting nulls in the case that no matching record is found in one of the source KTables. The groupBy operation then receives null inputs that it's not expecting. Here is the snippet of code where I define the join and the groupBy: customerIdToAccountIdLookup.join(customerIdToUserIdLookup, (Integer accountId, String userId) -> { return new KeyValue<>(accountId, userId); }) .groupBy((Integer customerId, KeyValuekv) -> { return kv; }, Serdes.Integer(), Serdes.String()) This produces the following exception: ! java.lang.NullPointerException: null ! at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:88) Am I approaching this incorrectly, or is there a bug going on? Should a KTable-KTable inner join be emitting records when no match is found?
Expose Kafka Streams test fixtures as a kafka-streams-test package?
As I'm developing a Kafka Streams application, I ended up copying the content of streams/src/test/java/org/apache/kafka/test/ into my project in order to use the KStreamTestDriver and associated functionality in tests, which is working really well. Would the Kafka team to open to refactoring these fixtures to be a separate package, perhaps moving the source into a new streams/test-fixtures/src/main and adding appropriate configuration to the gradle build to pull in that package as a test dependency where needed and to publish it as a separate artifact? If there's interest in this, I would submit a PR.
How to "buffer" a stream with high churn and output only at the end of a window?
Is it true that the aggregation and reduction methods of KStream will emit a new output message for each incoming message? I have an application that's copying a Postgres replication stream to a Kafka topic, and activity tends to be clustered, with many updates to a given primary key happening in quick succession. I'd like to smooth that out by buffering the messages in tumbling windows, allowing the updates to overwrite one another, and emitting output messages only at the end of the window. Does the Kafka Streams API provide any hooks that I could use to achieve this kind of windowed "buffering" or "deduplication" of a stream?
How to explicitly apply TimestampExtractor?
The only hook I see for specifying a TimestampExtractor is in the Properties that you pass when creating a KafkaStreams instance. Is it possible to modify the timestamp while processing a stream, or does the timestamp need to be extracted immediately upon entry into the topology? I have a case where I'm creating a KStream from a topic with mostly JSON-formatted messages. I need to deserialize as byte array, filter out non-JSON messages, call .map on the stream to deserialize those objects into desired POJOs, and only then reach into the objects to extract the desired timestamp. Workarounds I've imagined are either to define a TimestampExtractor that attempts to do some partial deserialization of the payload to get at the timestamp field; or, to create two separate topologies, with the second one reading a topic that's already filtered.
kafka client and streams jars for RC builds
I'm developing a Kafka Streams application and trying to keep up with the evolution of the API as 0.10.0.0 release candidates come out. We've got a test cluster running RC0, but it doesn't look like client or streams jars for the RCs are available on maven central. Are there any plans to upload jars for the RCs to a maven repo? Or will I have to build them myself as we develop?
Re: KStream-KTable join with the KTable given a "head start"
-- Forwarded message -- > From: Jeff Klukas <jklu...@simple.com> > To: users@kafka.apache.org > Cc: > Date: Wed, 30 Mar 2016 11:14:53 -0400 > Subject: KStream-KTable join with the KTable given a "head start" > I have a KStream that I want to enrich with some values from a lookup > table. When a new key enters the KStream, there's likely to be a > corresponding entry arriving on the KStream at the same time, so we end up > with a race condition. If the KTable record arrives first, then its value > is available for joining when the corresponding arrives on the KStream. If > the KStream record arrives first, however, we'll get a null join even if > the KTable gets the corresponding record only milliseconds later. > > I'd like to give the KTable a "head start" of ~10 seconds, so that it gets > a chance to get updated before the corresponding records arrive on the > KStream. Could I achieve this using one of the existing Windowing > strategies? > > > -- Forwarded message -- > From: Guozhang Wang <wangg...@gmail.com> > To: "users@kafka.apache.org" <users@kafka.apache.org> > Cc: > Date: Wed, 30 Mar 2016 13:51:03 -0700 > Subject: Re: KStream-KTable join with the KTable given a "head start" > Hi Jeff, > > This is a common case of stream-table join, in that the joining results > depending on the arrival ordering from these two sources. > > In Kafka Streams you can try to "synchronize" multiple input streams > through the "TimestampExtractor" interface, which is used to assign a > timestamp to each record polled from Kafka to start the processing. You > can, for example, set the timestamps for your KStream within a later time > interval and the timestamps for your KTable stream with an earlier time > interval, so that the records from table are likely to be processed first. > Note that this is an best effort, in that we cannot guarantee global > ordering across streams while processing, that if you have a much later > record coming from KTable then it will not block earlier records from > KStream from being processed first. But we think this mechanism should be > sufficient in practice. > > Let me know if it fits with your scenario, and if not we can talk about how > it can be possibly improved. > What would I pass in for a window in this case? Or would I not pass in a window? I don't want to put any lower limit on the KTable timestamps I'd be willing to join on (the corresponding entry in the KTable could have been from weeks ago, or it could be fired right at the same time as the KStream event). Could I use JoinWindows.before() and pass in an arbitrarily long interval?
KStream-KTable join with the KTable given a "head start"
I have a KStream that I want to enrich with some values from a lookup table. When a new key enters the KStream, there's likely to be a corresponding entry arriving on the KStream at the same time, so we end up with a race condition. If the KTable record arrives first, then its value is available for joining when the corresponding arrives on the KStream. If the KStream record arrives first, however, we'll get a null join even if the KTable gets the corresponding record only milliseconds later. I'd like to give the KTable a "head start" of ~10 seconds, so that it gets a chance to get updated before the corresponding records arrive on the KStream. Could I achieve this using one of the existing Windowing strategies?
Lookup table common to all threads in a Kafka Streams app
I'm experimenting with the Kafka Streams preview and understand that joins can only happen between KStreams and/or KTables that are co-partitioned. This is a reasonable limitation necessary to support large streams. What if I have a small topic, though, that I'd like to be able to join based on values in a stream's messages rather than the partition key? Could there be a concept of a fully replicated KTable where every thread in my Kafka Streams application would read a full copy into memory to be available for joins without the restriction on shared keys? I could probably achieve the effect I want by implementing a consumer in a separate thread to read the topic into RocksDB. I would then do lookups from that separate RocksDB instance in "map" operations within my Kafka Streams application. Is there an easier alternative that I'm missing? It would nice to have a standard mechanism for maintaining small topics like these and making them available for joins without key restrictions.