[jira] [Updated] (KAFKA-4932) Add UUID Serde
[ https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Klukas updated KAFKA-4932: --- Description: I propose adding serializers and deserializers for the java.util.UUID class. I have many use cases where I want to set the key of a Kafka message to be a UUID. Currently, I need to turn UUIDs into strings or byte arrays and use their associated Serdes, but it would be more convenient to serialize and deserialize UUIDs directly. I'd propose that the serializer and deserializer use the 36-byte string representation, calling UUID.toString and UUID.fromString. We would also wrap these in a Serde and modify the streams Serdes class to include this in the list of supported types. Optionally, we could have the deserializer support a 16-byte representation and it would check the size of the input byte array to determine whether it's a binary or string representation of the UUID. It's not well defined whether the most significant bits or least significant go first, so this deserializer would have to support only one or the other. Similary, if the deserializer supported a 16-byte representation, there could be two variants of the serializer, a UUIDStringSerializer and a UUIDBytesSerializer. I would be willing to write this PR, but am looking for feedback about whether there are significant concerns here around ambiguity of what the byte representation of a UUID should be, or if there's desire to keep to list of built-in Serdes minimal such that a PR would be unlikely to be accepted. was: I propose adding serializers and deserializers for the java.util.UUID class. I have many use cases where I want to set the key of a Kafka message to be a UUID. Currently, I need turn UUIDs into strings or byte arrays and use the associated Serdes, but it would be more convenient to serialize and deserialize UUIDs directly. I'd propose that the serializer and deserializer use the 36-byte string representation, calling UUID.toString and UUID.fromString Optionally, we could also has the deserializer support a 16-byte representation and it would check size of the input byte array to determine whether it's a binary or string representation of the UUID. It's not well defined whether the most significant bits or least significant go first, so this deserializer would have to support only one or the other. Optionally, there could be two variants of the serializer, a UUIDStringSerializer and a UUIDBytesSerializer. We would also wrap these in a Serde and modify the Serdes class to include this in the list of supported types. I would be willing to write this PR, but am looking for feedback about whether there are significant concerns here around ambiguity of what the byte representation of a UUID should be, or if there's desire to keep to list of built-in Serdes minimal such that a PR would be unlikely to be accepted. > Add UUID Serde > -- > > Key: KAFKA-4932 > URL: https://issues.apache.org/jira/browse/KAFKA-4932 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Priority: Minor > > I propose adding serializers and deserializers for the java.util.UUID class. > I have many use cases where I want to set the key of a Kafka message to be a > UUID. Currently, I need to turn UUIDs into strings or byte arrays and use > their associated Serdes, but it would be more convenient to serialize and > deserialize UUIDs directly. > I'd propose that the serializer and deserializer use the 36-byte string > representation, calling UUID.toString and UUID.fromString. We would also wrap > these in a Serde and modify the streams Serdes class to include this in the > list of supported types. > Optionally, we could have the deserializer support a 16-byte representation > and it would check the size of the input byte array to determine whether it's > a binary or string representation of the UUID. It's not well defined whether > the most significant bits or least significant go first, so this deserializer > would have to support only one or the other. > Similary, if the deserializer supported a 16-byte representation, there could > be two variants of the serializer, a UUIDStringSerializer and a > UUIDBytesSerializer. > I would be willing to write this PR, but am looking for feedback about > whether there are significant concerns here around ambiguity of what the byte > representation of a UUID should be, or if there's desire to keep to list of > built-in Serdes minimal such that a PR would be unlikely to be accepted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4932) Add UUID Serde
[ https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Klukas updated KAFKA-4932: --- Summary: Add UUID Serde (was: Add UUID Serdes) > Add UUID Serde > -- > > Key: KAFKA-4932 > URL: https://issues.apache.org/jira/browse/KAFKA-4932 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Priority: Minor > > I propose adding serializers and deserializers for the java.util.UUID class. > I have many use cases where I want to set the key of a Kafka message to be a > UUID. Currently, I need turn UUIDs into strings or byte arrays and use the > associated Serdes, but it would be more convenient to serialize and > deserialize UUIDs directly. > I'd propose that the serializer and deserializer use the 36-byte string > representation, calling UUID.toString and UUID.fromString > Optionally, we could also has the deserializer support a 16-byte > representation and it would check size of the input byte array to determine > whether it's a binary or string representation of the UUID. It's not well > defined whether the most significant bits or least significant go first, so > this deserializer would have to support only one or the other. > Optionally, there could be two variants of the serializer, a > UUIDStringSerializer and a UUIDBytesSerializer. > We would also wrap these in a Serde and modify the Serdes class to include > this in the list of supported types. > I would be willing to write this PR, but am looking for feedback about > whether there are significant concerns here around ambiguity of what the byte > representation of a UUID should be, or if there's desire to keep to list of > built-in Serdes minimal such that a PR would be unlikely to be accepted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4932) Add UUID Serdes
Jeff Klukas created KAFKA-4932: -- Summary: Add UUID Serdes Key: KAFKA-4932 URL: https://issues.apache.org/jira/browse/KAFKA-4932 Project: Kafka Issue Type: Improvement Components: clients, streams Reporter: Jeff Klukas Priority: Minor I propose adding serializers and deserializers for the java.util.UUID class. I have many use cases where I want to set the key of a Kafka message to be a UUID. Currently, I need turn UUIDs into strings or byte arrays and use the associated Serdes, but it would be more convenient to serialize and deserialize UUIDs directly. I'd propose that the serializer and deserializer use the 36-byte string representation, calling UUID.toString and UUID.fromString Optionally, we could also has the deserializer support a 16-byte representation and it would check size of the input byte array to determine whether it's a binary or string representation of the UUID. It's not well defined whether the most significant bits or least significant go first, so this deserializer would have to support only one or the other. Optionally, there could be two variants of the serializer, a UUIDStringSerializer and a UUIDBytesSerializer. We would also wrap these in a Serde and modify the Serdes class to include this in the list of supported types. I would be willing to write this PR, but am looking for feedback about whether there are significant concerns here around ambiguity of what the byte representation of a UUID should be, or if there's desire to keep to list of built-in Serdes minimal such that a PR would be unlikely to be accepted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-4257) Inconsistencies in 0.10.1 upgrade docs
[ https://issues.apache.org/jira/browse/KAFKA-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Klukas resolved KAFKA-4257. Resolution: Fixed Ismael's PR addresses these questions. > Inconsistencies in 0.10.1 upgrade docs > --- > > Key: KAFKA-4257 > URL: https://issues.apache.org/jira/browse/KAFKA-4257 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.10.1.0 >Reporter: Jeff Klukas >Priority: Minor > Fix For: 0.10.1.0 > > > There are several inconsistencies in the 0.10.1.0 upgrade docs that make it > difficult to determine what client versions are compatible with what broker > versions. > The initial heading in the upgrade docs is "Upgrading from 0.10.0.X to > 0.10.1.0", but it includes clauses about versions prior to 0.10. Is the > intention for these instructions to be valid for upgrading from brokers as > far back as 0.8? Should this section simply be called "Upgrading to 0.10.1.0"? > I cannot tell from the docs whether I can upgrade to 0.10.1.0 clients on top > of 0.10.0.X brokers. In particular, step 5 of the upgrade instructions > mentions "Once all consumers have been upgraded to 0.10.0". Should that read > 0.10.1, or is the intention here truly that clients on 9.X or below need to > be at version 0.10.0.0 at a minimum? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4257) Inconsistencies in 0.10.1 upgrade docs
[ https://issues.apache.org/jira/browse/KAFKA-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548729#comment-15548729 ] Jeff Klukas commented on KAFKA-4257: The pull request definitely helps. Thanks for jumping in to make those changes. I left a comment on the PR. > Inconsistencies in 0.10.1 upgrade docs > --- > > Key: KAFKA-4257 > URL: https://issues.apache.org/jira/browse/KAFKA-4257 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.10.1.0 >Reporter: Jeff Klukas >Priority: Minor > Fix For: 0.10.1.0 > > > There are several inconsistencies in the 0.10.1.0 upgrade docs that make it > difficult to determine what client versions are compatible with what broker > versions. > The initial heading in the upgrade docs is "Upgrading from 0.10.0.X to > 0.10.1.0", but it includes clauses about versions prior to 0.10. Is the > intention for these instructions to be valid for upgrading from brokers as > far back as 0.8? Should this section simply be called "Upgrading to 0.10.1.0"? > I cannot tell from the docs whether I can upgrade to 0.10.1.0 clients on top > of 0.10.0.X brokers. In particular, step 5 of the upgrade instructions > mentions "Once all consumers have been upgraded to 0.10.0". Should that read > 0.10.1, or is the intention here truly that clients on 9.X or below need to > be at version 0.10.0.0 at a minimum? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4257) Inconsistencies in 0.10.1 upgrade docs
Jeff Klukas created KAFKA-4257: -- Summary: Inconsistencies in 0.10.1 upgrade docs Key: KAFKA-4257 URL: https://issues.apache.org/jira/browse/KAFKA-4257 Project: Kafka Issue Type: Bug Components: documentation Affects Versions: 0.10.1.0 Reporter: Jeff Klukas Priority: Minor Fix For: 0.10.1.0 There are several inconsistencies in the 0.10.1.0 upgrade docs that make it difficult to determine what client versions are compatible with what broker versions. The initial heading in the upgrade docs is "Upgrading from 0.10.0.X to 0.10.1.0", but it includes clauses about versions prior to 0.10. Is the intention for these instructions to be valid for upgrading from brokers as far back as 0.8? Should this section simply be called "Upgrading to 0.10.1.0"? I cannot tell from the docs whether I can upgrade to 0.10.1.0 clients on top of 0.10.0.X brokers. In particular, step 5 of the upgrade instructions mentions "Once all consumers have been upgraded to 0.10.0". Should that read 0.10.1, or is the intention here truly that clients on 9.X or below need to be at version 0.10.0.0 at a minimum? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3895) Implement a default queue for expired messages
[ https://issues.apache.org/jira/browse/KAFKA-3895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533221#comment-15533221 ] Jeff Klukas commented on KAFKA-3895: It's not clear to me how the concepts here map onto the Kafka consumer model. What do you consider an "expired" message? I've been in some discussions recently with colleagues considering how we might get dead letter-like functionality in Kafka. In our case, we're usually concerned with a badly formed message that doesn't deserialize correctly or otherwise causes a non-retriable error in the consumer. An idea we have considered is creating a single deadletter topic for the cluster. When a consumer gets an unretriable error for a message, it would produce a message to that deadletter topic where the key would be group.id and the value would be a tuple of (topic, partition, offset) that failed to be consumed. We'd then be able to go back within the retention period, figure out why the message failed to be consumed, change our consumer logic, and reconsume that particular message if needed. > Implement a default queue for expired messages > -- > > Key: KAFKA-3895 > URL: https://issues.apache.org/jira/browse/KAFKA-3895 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Krish Iyer >Priority: Minor > > As discussed in the mailing list, kafka currently does not support a dead > letter queue-like feature where all the expired messages can be sent to. > A high-level design of such a feature can be as follows: > When kafka needs to expire a message from topic 'X', it sends the message to > 'X-expired' topic, rather than deleting it. If 'X' is 'X-expired', dequeue > and requeue the message in the expired topic. > Given that this topic can receive duplicate messages (same expired message > multiple times), the consumer of this topic needs to ensure duplicate message > handling. > Open question: > Does the message need a timestamp to figure out when did it move from a > normal queue to a dead letter queue. Does it need a flag to specify whether > it had expired from the X-expired topic itself? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3753) Add approximateNumEntries() to the StateStore interface for metrics reporting
[ https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Klukas updated KAFKA-3753: --- Summary: Add approximateNumEntries() to the StateStore interface for metrics reporting (was: Add size() to the StateStore interface for metrics reporting) > Add approximateNumEntries() to the StateStore interface for metrics reporting > - > > Key: KAFKA-3753 > URL: https://issues.apache.org/jira/browse/KAFKA-3753 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Labels: api > Fix For: 0.10.1.0 > > > As a developer building a Kafka Streams application, I'd like to have > visibility into what's happening with my state stores. How can I know if a > particular store is growing large? How can I know if a particular store is > frequently needing to hit disk? > I'm interested to know if there are existing mechanisms for extracting this > information or if other people have thoughts on how we might approach this. > I can't think of a way to provide metrics generically, so each state store > implementation would likely need to handle this separately. Given that the > default RocksDBStore will likely be the most-used, it would be a first target > for adding metrics. > I'd be interested in knowing the total number of entries in the store, the > total size on disk and in memory, rates of gets and puts, and hit/miss ratio > for the MemoryLRUCache. Some of these numbers are likely calculable through > the RocksDB API, others may simply not be accessible. > Would there be value to the wider community in having state stores register > metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references
[ https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325270#comment-15325270 ] Jeff Klukas commented on KAFKA-3801: I like concise code that you get from a static method reference: {{mapValues(LongDeserializer::deserialize)}} as opposed to {{mapValues(bytes -> Serdes.Long().deserializer().deserialize(bytes))}}, but you're correct that it's possible to use {{Serdes}} here inline. I'm fine to close this in deference to a more comprehensive solution down the line. > Provide static serialize() and deserialize() for use as method references > - > > Key: KAFKA-3801 > URL: https://issues.apache.org/jira/browse/KAFKA-3801 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} > are abstracted away in Kafka Streams through the use of `Serdes` classes, > there are some instances where developers may want to call them directly. The > serializers and deserializers for simple types don't require any > configuration and could be static, but currently it's necessary to create an > instance to use those methods. > I'd propose moving serialization logic into a {{static public byte[] > serialize(? data)}} method and deserialization logic into a {{static public ? > deserialize(byte[] data)}} method. The existing instance methods would simply > call the static versions. > See a full example for LongSerializer and LongDeserializer here: > https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1 > In Java 8, these static methods then become available for method references > in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the > user needing to create an instance of {{LongDeserializer}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references
[ https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323718#comment-15323718 ] Jeff Klukas commented on KAFKA-3801: Let me give more context for the example. We have an application that produces JSON messages to a Kafka topic interleaved with occasional checkpoint messages that are of {{Long}} type. If I want to create a KStream of just the checkpoint messages, I need to filter out the JSON messages before deserializing. Here's what it looks like: {{KStream checkpointStream = builder.stream(Serdes.Long(), Serdes.ByteArray(), inputTopicName)}} {{.filter((key, bytes) -> bytes.length == 8).mapValues(LongDeserializer::deserialize)}} I need to use ByteArraySerde when calling {{stream}}, then I do the deserialization in a {{mapValues}} invocation after filtering out messages of the wrong type. Another option would be to materialize the stream to a topic after the filter and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}}, but I'd like to avoid unnecessary materialization. So in the current scheme, I need to create an instance of {{LongDeserializer}} separately so that I can then call its {{deserialize}} method in {{mapValues}}. This situation probably won't occur frequently, so I understand if it's decided not to bother considering this change. > Provide static serialize() and deserialize() for use as method references > - > > Key: KAFKA-3801 > URL: https://issues.apache.org/jira/browse/KAFKA-3801 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} > are abstracted away in Kafka Streams through the use of `Serdes` classes, > there are some instances where developers may want to call them directly. The > serializers and deserializers for simple types don't require any > configuration and could be static, but currently it's necessary to create an > instance to use those methods. > I'd propose moving serialization logic into a {{static public byte[] > serialize(? data)}} method and deserialization logic into a {{static public ? > deserialize(byte[] data)}} method. The existing instance methods would simply > call the static versions. > See a full example for LongSerializer and LongDeserializer here: > https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1 > In Java 8, these static methods then become available for method references > in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the > user needing to create an instance of {{LongDeserializer}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3753) Add size() to the StateStore interface for metrics reporting
[ https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323342#comment-15323342 ] Jeff Klukas commented on KAFKA-3753: I agree that {{size}} is a bit ambiguous. I'd expect many folks would recognize the analogy to Java's {{Map.size}}, but there's no reason we need to stick with that. RocksDB has nice consistency in their naming of properties, where {{size}} refers to bytes and {{num-entries}} refers to a count. The PR discussion also brings up the point that since we can only get an estimated count from RocksDB, the name of this method should indicate that the result is not necessarily exact. I'd be happy to see this method called {{estimatedCount}}, {{estimatedNumEntries}}, {{approximateCount}}, or some other variant. > Add size() to the StateStore interface for metrics reporting > > > Key: KAFKA-3753 > URL: https://issues.apache.org/jira/browse/KAFKA-3753 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Labels: api > Fix For: 0.10.1.0 > > > As a developer building a Kafka Streams application, I'd like to have > visibility into what's happening with my state stores. How can I know if a > particular store is growing large? How can I know if a particular store is > frequently needing to hit disk? > I'm interested to know if there are existing mechanisms for extracting this > information or if other people have thoughts on how we might approach this. > I can't think of a way to provide metrics generically, so each state store > implementation would likely need to handle this separately. Given that the > default RocksDBStore will likely be the most-used, it would be a first target > for adding metrics. > I'd be interested in knowing the total number of entries in the store, the > total size on disk and in memory, rates of gets and puts, and hit/miss ratio > for the MemoryLRUCache. Some of these numbers are likely calculable through > the RocksDB API, others may simply not be accessible. > Would there be value to the wider community in having state stores register > metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3817) KTableRepartitionMap should handle null inputs
Jeff Klukas created KAFKA-3817: -- Summary: KTableRepartitionMap should handle null inputs Key: KAFKA-3817 URL: https://issues.apache.org/jira/browse/KAFKA-3817 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.0 Reporter: Jeff Klukas Assignee: Guozhang Wang Fix For: 0.10.0.1 When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs are raised: {{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$ > KTableMapProcessor.process(KTableRepartitionMap.java:88)}} The root cause is that the join is expected to emit null values when no match is found, but KTableRepartitionMap is not set up to handle this case. On the users email list, [~guozhang] described a plan of action: I think this is actually a bug in KTableRepartitionMap that it actually should expect null grouped keys; this would be a straight-forward fix for this operator, but I can make a pass over all the repartition operators just to make sure they are all gracefully handling null keys. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3753) Metrics for StateStores
[ https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15322683#comment-15322683 ] Jeff Klukas commented on KAFKA-3753: Since there's refactoring going on right now with the metrics interface for streams (https://issues.apache.org/jira/browse/KAFKA-3715), I think we should delay actually adding size metrics to a different issue. The PR I attached here adds the size() method so that it can be used for a metric in the future. > Metrics for StateStores > --- > > Key: KAFKA-3753 > URL: https://issues.apache.org/jira/browse/KAFKA-3753 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Labels: api > Fix For: 0.10.1.0 > > > As a developer building a Kafka Streams application, I'd like to have > visibility into what's happening with my state stores. How can I know if a > particular store is growing large? How can I know if a particular store is > frequently needing to hit disk? > I'm interested to know if there are existing mechanisms for extracting this > information or if other people have thoughts on how we might approach this. > I can't think of a way to provide metrics generically, so each state store > implementation would likely need to handle this separately. Given that the > default RocksDBStore will likely be the most-used, it would be a first target > for adding metrics. > I'd be interested in knowing the total number of entries in the store, the > total size on disk and in memory, rates of gets and puts, and hit/miss ratio > for the MemoryLRUCache. Some of these numbers are likely calculable through > the RocksDB API, others may simply not be accessible. > Would there be value to the wider community in having state stores register > metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references
Jeff Klukas created KAFKA-3801: -- Summary: Provide static serialize() and deserialize() for use as method references Key: KAFKA-3801 URL: https://issues.apache.org/jira/browse/KAFKA-3801 Project: Kafka Issue Type: Improvement Components: clients, streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} are abstracted away in Kafka Streams through the use of `Serdes` classes, there are some instances where developers may want to call them directly. The serializers and deserializers for simple types don't require any configuration and could be static, but currently it's necessary to create an instance to use those methods. I'd propose moving serialization logic into a {{static public byte[] serialize(? data)}} method and deserialization logic into a {{static public ? deserialize(byte[] data)}} method. The existing instance methods would simply call the static versions. See a full example for LongSerializer and LongDeserializer here: https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1 In Java 8, these static methods then become available for method references in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the user needing to create an instance of {{LongDeserializer}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3711) Allow configuration of MetricsReporter subclasses
[ https://issues.apache.org/jira/browse/KAFKA-3711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318768#comment-15318768 ] Jeff Klukas commented on KAFKA-3711: I submitted a PR above to call {{originals()}} when getting configured instances. Documentation changes can be a separate issue. > Allow configuration of MetricsReporter subclasses > - > > Key: KAFKA-3711 > URL: https://issues.apache.org/jira/browse/KAFKA-3711 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > The current interface for attaching metrics reporters to clients allows only > defining a list of class names, but provides no means for configuring those > reporters. > There is at least one existing project > (https://github.com/apakulov/kafka-graphite) that solves this problem by > passing additional properties into the client, which then get passed on to > the reporter. This seems to work quite well, but it generates warnings like > {{The configuration kafka.graphite.metrics.prefix = foo was supplied but > isn't a known config.}} > Should passing arbitrary additional parameters like this be officially > supported as the way to configure metrics reporters? Should these warnings > about unrecognized parameters be removed? > Perhaps there should be some mechanism for registering additional > configuration parameters for clients to expect? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3711) Allow configuration of MetricsReporter subclasses
[ https://issues.apache.org/jira/browse/KAFKA-3711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318546#comment-15318546 ] Jeff Klukas commented on KAFKA-3711: I would love to see this fixed as well. I'd also love to see some more documentation about Kafka's config framework and how various interfaces like {{MetricsReporter}} are intended to be used. Specifically, I'd like to see the developer docs describe that it's intended that user-provided classes can define configuration options and provide some advice about how to name those options. I'd be interested in contributing docs and cleaning up the config code if it's clear what needs to be done. Am I correct in understanding that the code change needed here is to ensure that {{originals()}} is called rather than {{this.originals}} everywhere that we're passing configs on in {{AbstractConfig}}? > Allow configuration of MetricsReporter subclasses > - > > Key: KAFKA-3711 > URL: https://issues.apache.org/jira/browse/KAFKA-3711 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > The current interface for attaching metrics reporters to clients allows only > defining a list of class names, but provides no means for configuring those > reporters. > There is at least one existing project > (https://github.com/apakulov/kafka-graphite) that solves this problem by > passing additional properties into the client, which then get passed on to > the reporter. This seems to work quite well, but it generates warnings like > {{The configuration kafka.graphite.metrics.prefix = foo was supplied but > isn't a known config.}} > Should passing arbitrary additional parameters like this be officially > supported as the way to configure metrics reporters? Should these warnings > about unrecognized parameters be removed? > Perhaps there should be some mechanism for registering additional > configuration parameters for clients to expect? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3753) Metrics for StateStores
[ https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318521#comment-15318521 ] Jeff Klukas commented on KAFKA-3753: It's good to know about MeteredKeyValueStore. And holding off on cache-related metrics for the redesign sounds reasonable. It looks like the KeyValueStore interface does not provide any method related to number of entries (besides calling `all()` and iterating over all values, which doesn't seem reasonable), so there's no way for MeteredKeyValueStore to access the number of entries in the wrapped store. Could we consider adding a `size` method to the KeyValueStore interface? > Metrics for StateStores > --- > > Key: KAFKA-3753 > URL: https://issues.apache.org/jira/browse/KAFKA-3753 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Labels: api > Fix For: 0.10.1.0 > > > As a developer building a Kafka Streams application, I'd like to have > visibility into what's happening with my state stores. How can I know if a > particular store is growing large? How can I know if a particular store is > frequently needing to hit disk? > I'm interested to know if there are existing mechanisms for extracting this > information or if other people have thoughts on how we might approach this. > I can't think of a way to provide metrics generically, so each state store > implementation would likely need to handle this separately. Given that the > default RocksDBStore will likely be the most-used, it would be a first target > for adding metrics. > I'd be interested in knowing the total number of entries in the store, the > total size on disk and in memory, rates of gets and puts, and hit/miss ratio > for the MemoryLRUCache. Some of these numbers are likely calculable through > the RocksDB API, others may simply not be accessible. > Would there be value to the wider community in having state stores register > metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3753) Metrics for StateStores
Jeff Klukas created KAFKA-3753: -- Summary: Metrics for StateStores Key: KAFKA-3753 URL: https://issues.apache.org/jira/browse/KAFKA-3753 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 As a developer building a Kafka Streams application, I'd like to have visibility into what's happening with my state stores. How can I know if a particular store is growing large? How can I know if a particular store is frequently needing to hit disk? I'm interested to know if there are existing mechanisms for extracting this information or if other people have thoughts on how we might approach this. I can't think of a way to provide metrics generically, so each state store implementation would likely need to handle this separately. Given that the default RocksDBStore will likely be the most-used, it would be a first target for adding metrics. I'd be interested in knowing the total number of entries in the store, the total size on disk and in memory, rates of gets and puts, and hit/miss ratio for the MemoryLRUCache. Some of these numbers are likely calculable through the RocksDB API, others may simply not be accessible. Would there be value to the wider community in having state stores register metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3714) Allow users greater access to register custom streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Klukas updated KAFKA-3714: --- Issue Type: Improvement (was: Bug) > Allow users greater access to register custom streams metrics > - > > Key: KAFKA-3714 > URL: https://issues.apache.org/jira/browse/KAFKA-3714 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > Copying in some discussion that originally appeared in > https://github.com/apache/kafka/pull/1362#issuecomment-219064302 > Kafka Streams is largely a higher-level abstraction on top of producers and > consumers, and it seems sensible to match the KafkaStreams interface to that > of KafkaProducer and KafkaConsumer where possible. For producers and > consumers, the metric registry is internal and metrics are only exposed as an > unmodifiable map. This allows users to access client metric values for use in > application health checks, etc., but doesn't allow them to register new > metrics. > That approach seems reasonable if we assume that a user interested in > defining custom metrics is already going to be using a separate metrics > library. In such a case, users will likely find it easier to define metrics > using whatever library they're familiar with rather than learning the API for > Kafka's Metrics class. Is this a reasonable assumption? > If we want to expose the Metrics instance so that users can define arbitrary > metrics, I'd argue that there's need for documentation updates. In > particular, I find the notion of metric tags confusing. Tags can be defined > in a MetricConfig when the Metrics instance is constructed, > StreamsMetricsImpl is maintaining its own set of tags, and users can set tag > overrides. > If a user were to get access to the Metrics instance, they would be missing > the tags defined in StreamsMetricsImpl. I'm imagining that users would want > their custom metrics to sit alongside the predefined metrics with the same > tags, and users shouldn't be expected to manage those additional tags > themselves. > So, why are we allowing users to define their own metrics via the > StreamsMetrics interface in the first place? Is it that we'd like to be able > to provide a built-in latency metric, but the definition depends on the > details of the use case so there's no generic solution? That would be > sufficient motivation for this special case of addLatencySensor. If we want > to continue down that path and give users access to define a wider range of > custom metrics, I'd prefer to extend the StreamsMetrics interface so that > users can call methods on that object, automatically getting the tags > appropriate for that instance rather than interacting with the raw Metrics > instance. > --- > Guozhang had the following comments: > 1) For the producer/consumer cases, all internal metrics are provided and > abstracted from users, and they just need to read the documentation to poll > whatever provided metrics that they are interested; and if they want to > define more metrics, they are likely to be outside the clients themselves and > they can use whatever methods they like, so Metrics do not need to be exposed > to users. > 2) For streams, things are a bit different: users define the computational > logic, which becomes part of the "Streams Client" processing and may be of > interests to be monitored by user themselves; think of a customized processor > that sends an email to some address based on a condition, and users want to > monitor the average rate of emails sent. Hence it is worth considering > whether or not they should be able to access the Metrics instance to define > their own along side the pre-defined metrics provided by the library. > 3) Now, since the Metrics class was not previously designed for public usage, > it is not designed to be very user-friendly for defining sensors, especially > the semantics differences between name / scope / tags. StreamsMetrics tries > to hide some of these semantics confusion from users, but it still expose > tags and hence is not perfect in doing so. We need to think of a better > approach so that: 1) user defined metrics will be "aligned" (i.e. with the > same name prefix within a single application, with similar scope hierarchy > definition, etc) with library provided metrics, 2) natural APIs to do so. > I do not have concrete ideas about 3) above on top of my head, comments are > more than welcomed. > --- > I'm not sure that I agree that 1) and 2) are truly different situations. A > user might choose to send email messages within a bare consumer rather than a > streams application, and still want to mai
[jira] [Commented] (KAFKA-3715) Higher granularity streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15284706#comment-15284706 ] Jeff Klukas commented on KAFKA-3715: It would be interesting to work on this, but I won't likely have time in the near future, so others should feel free to implement these ideas. > Higher granularity streams metrics > --- > > Key: KAFKA-3715 > URL: https://issues.apache.org/jira/browse/KAFKA-3715 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.1.0 > > > Originally proposed by [~guozhang] in > https://github.com/apache/kafka/pull/1362#issuecomment-218326690 > We can consider adding metrics for process / punctuate / commit rate at the > granularity of each processor node in addition to the global rate mentioned > above. This is very helpful in debugging. > We can consider adding rate / total cumulated metrics for context.forward > indicating how many records were forwarded downstream from this processor > node as well. This is helpful in debugging. > We can consider adding metrics for each stream partition's timestamp. This is > helpful in debugging. > Besides the latency metrics, we can also add throughput latency in terms of > source records consumed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3715) Higher granularity streams metrics
Jeff Klukas created KAFKA-3715: -- Summary: Higher granularity streams metrics Key: KAFKA-3715 URL: https://issues.apache.org/jira/browse/KAFKA-3715 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 Originally proposed by [~guozhang] in https://github.com/apache/kafka/pull/1362#issuecomment-218326690 We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging. We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging. We can consider adding metrics for each stream partition's timestamp. This is helpful in debugging. Besides the latency metrics, we can also add throughput latency in terms of source records consumed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3714) Allow users greater access to register custom streams metrics
Jeff Klukas created KAFKA-3714: -- Summary: Allow users greater access to register custom streams metrics Key: KAFKA-3714 URL: https://issues.apache.org/jira/browse/KAFKA-3714 Project: Kafka Issue Type: Bug Components: streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 Copying in some discussion that originally appeared in https://github.com/apache/kafka/pull/1362#issuecomment-219064302 Kafka Streams is largely a higher-level abstraction on top of producers and consumers, and it seems sensible to match the KafkaStreams interface to that of KafkaProducer and KafkaConsumer where possible. For producers and consumers, the metric registry is internal and metrics are only exposed as an unmodifiable map. This allows users to access client metric values for use in application health checks, etc., but doesn't allow them to register new metrics. That approach seems reasonable if we assume that a user interested in defining custom metrics is already going to be using a separate metrics library. In such a case, users will likely find it easier to define metrics using whatever library they're familiar with rather than learning the API for Kafka's Metrics class. Is this a reasonable assumption? If we want to expose the Metrics instance so that users can define arbitrary metrics, I'd argue that there's need for documentation updates. In particular, I find the notion of metric tags confusing. Tags can be defined in a MetricConfig when the Metrics instance is constructed, StreamsMetricsImpl is maintaining its own set of tags, and users can set tag overrides. If a user were to get access to the Metrics instance, they would be missing the tags defined in StreamsMetricsImpl. I'm imagining that users would want their custom metrics to sit alongside the predefined metrics with the same tags, and users shouldn't be expected to manage those additional tags themselves. So, why are we allowing users to define their own metrics via the StreamsMetrics interface in the first place? Is it that we'd like to be able to provide a built-in latency metric, but the definition depends on the details of the use case so there's no generic solution? That would be sufficient motivation for this special case of addLatencySensor. If we want to continue down that path and give users access to define a wider range of custom metrics, I'd prefer to extend the StreamsMetrics interface so that users can call methods on that object, automatically getting the tags appropriate for that instance rather than interacting with the raw Metrics instance. --- Guozhang had the following comments: 1) For the producer/consumer cases, all internal metrics are provided and abstracted from users, and they just need to read the documentation to poll whatever provided metrics that they are interested; and if they want to define more metrics, they are likely to be outside the clients themselves and they can use whatever methods they like, so Metrics do not need to be exposed to users. 2) For streams, things are a bit different: users define the computational logic, which becomes part of the "Streams Client" processing and may be of interests to be monitored by user themselves; think of a customized processor that sends an email to some address based on a condition, and users want to monitor the average rate of emails sent. Hence it is worth considering whether or not they should be able to access the Metrics instance to define their own along side the pre-defined metrics provided by the library. 3) Now, since the Metrics class was not previously designed for public usage, it is not designed to be very user-friendly for defining sensors, especially the semantics differences between name / scope / tags. StreamsMetrics tries to hide some of these semantics confusion from users, but it still expose tags and hence is not perfect in doing so. We need to think of a better approach so that: 1) user defined metrics will be "aligned" (i.e. with the same name prefix within a single application, with similar scope hierarchy definition, etc) with library provided metrics, 2) natural APIs to do so. I do not have concrete ideas about 3) above on top of my head, comments are more than welcomed. --- I'm not sure that I agree that 1) and 2) are truly different situations. A user might choose to send email messages within a bare consumer rather than a streams application, and still want to maintain a metric of sent emails. In this bare consumer case, we'd expect the user to define that email-sent metric outside of Kafka's metrics machinery. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3711) Allow configuration of MetricsReporter subclasses
Jeff Klukas created KAFKA-3711: -- Summary: Allow configuration of MetricsReporter subclasses Key: KAFKA-3711 URL: https://issues.apache.org/jira/browse/KAFKA-3711 Project: Kafka Issue Type: Improvement Components: clients, streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 The current interface for attaching metrics reporters to clients allows only defining a list of class names, but provides no means for configuring those reporters. There is at least one existing project (https://github.com/apakulov/kafka-graphite) that solves this problem by passing additional properties into the client, which then get passed on to the reporter. This seems to work quite well, but it generates warnings like {{The configuration kafka.graphite.metrics.prefix = foo was supplied but isn't a known config.}} Should passing arbitrary additional parameters like this be officially supported as the way to configure metrics reporters? Should these warnings about unrecognized parameters be removed? Perhaps there should be some mechanism for registering additional configuration parameters for clients to expect? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3701) Expose KafkaStreams metrics in public API
Jeff Klukas created KAFKA-3701: -- Summary: Expose KafkaStreams metrics in public API Key: KAFKA-3701 URL: https://issues.apache.org/jira/browse/KAFKA-3701 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.1.0 The Kafka clients expose their metrics registries through a `metrics` method presenting an unmodifiable collection, but `KafkaStreams` does not expose its registry. Currently, applications can access a StreamsMetrics instance via the ProcessorContext within a Processor, but this limits flexibility. Having read-only access to a KafkaStreams.metrics() method would allow a developer to define a health check for their application based on the metrics that KafkaStreams is collecting. Or a developer might want to define a metric in some other framework based on KafkaStreams' metrics. I am imagining that an application would build and register KafkaStreams-based health checks after building a KafkaStreams instance but before calling the start() method. Are metrics added to the registry at the time a KafkaStreams instance is constructed, or only after calling the start() method? If metrics are registered only after application startup, then this approach may not be sufficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258338#comment-15258338 ] Jeff Klukas commented on KAFKA-3625: I would be willing to submit this patch if folks think it has merit and there's agreement as to how the package should be named. > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Guozhang Wang >Priority: Minor > Fix For: 0.10.0.0 > > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3625) Move kafka-streams test fixtures into a published package
Jeff Klukas created KAFKA-3625: -- Summary: Move kafka-streams test fixtures into a published package Key: KAFKA-3625 URL: https://issues.apache.org/jira/browse/KAFKA-3625 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jeff Klukas Assignee: Guozhang Wang Priority: Minor Fix For: 0.10.0.0 The KStreamTestDriver and related fixtures defined in streams/src/test/java/org/apache/kafka/test would be useful to developers building applications on top of Kafka Streams, but they are not currently exposed in a package. I propose moving this directory to live under streams/fixtures/src/main and creating a new 'streams:fixtures' project in the gradle configuration to publish these as a separate package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)