[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817385#comment-17817385 ] Tommy Becker commented on KAFKA-13505: -- I'm wondering if this bug is being overlooked because it's described in terms of third party libraries like Confluent's S3 connector and Avro. But the issue is actually in the SchemaProjector class, which is part of Kafka Connect itself. Confluent's AvroConverter represents Avro enums in Connect Schema as Strings with "parameters" corresponding to each enum value. Even though these parameters seem like they are just intended to be metadata, SchemaProjector.checkMaybeCompatible() requires the parameters of each field to *exactly* match, so when values are added to an Avro enum, new parameters are added to the generated Connect Schema, breaking this test. Intuitively, it feels like this check could be less strict, but there is no specification for Connect Schema that I can find, so I don't know if requiring this parameter equality is correct or not. > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above
[jira] [Commented] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method
[ https://issues.apache.org/jira/browse/KAFKA-16018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798636#comment-17798636 ] Tommy Becker commented on KAFKA-16018: -- I think that's true, but wasn't sure on the timing of 4.0. > KafkaStreams can go into a zombie state if UncaughtExceptionHandler is > specified via the deprecated method > -- > > Key: KAFKA-16018 > URL: https://issues.apache.org/jira/browse/KAFKA-16018 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Tommy Becker >Priority: Major > > We have a streams application in which all StreamThreads died due to a lack > of disk space. To our surprise, the KafkaStreams instance still reported its > state as running. Upon further investigation, it appears this is due to the > application setting an UncaughtExceptionHandler via the deprecated method > (this application was recently upgraded from 2.4.1): > [https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)] > The only way a StreamThread failure can cause the KafkaStreams instance to > transition to an error state now is via the new > StreamsUncaughtExceptionHandler machinery, but when an old > UncaughtExceptionHandler is set by the old method this code is effectively > bypassed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method
Tommy Becker created KAFKA-16018: Summary: KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method Key: KAFKA-16018 URL: https://issues.apache.org/jira/browse/KAFKA-16018 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.3.2 Reporter: Tommy Becker We have a streams application in which all StreamThreads died due to a lack of disk space. To our surprise, the KafkaStreams instance still reported its state as running. Upon further investigation, it appears this is due to the application setting an UncaughtExceptionHandler via the deprecated method (this application was recently upgraded from 2.4.1): [https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)] The only way a StreamThread failure can cause the KafkaStreams instance to transition to an error state now is via the new StreamsUncaughtExceptionHandler machinery, but when an old UncaughtExceptionHandler is set by the old method this code is effectively bypassed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins
[ https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402213#comment-17402213 ] Tommy Becker commented on KAFKA-12317: -- At least in the case of left stream-globalTable joins, this is how it used to work prior to 2.7/KAFKA-10277. I agree that it would be easier to reason about if these operations did not drop any records and relied on filter operations instead. > Relax non-null key requirement for left/outer KStream joins > --- > > Key: KAFKA-12317 > URL: https://issues.apache.org/jira/browse/KAFKA-12317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Juan C. Gonzalez-Zurita >Priority: Major > > Currently, for a stream-streams and stream-table/globalTable join > KafkaStreams drops all stream records with a `null`-key (`null`-join-key for > stream-globalTable), because for a `null`-(join)key the join is undefined: > ie, we don't have an attribute the do the table lookup (we consider the > stream-record as malformed). Note, that we define the semantics of > _left/outer_ join as: keep the stream record if no matching join record was > found. > We could relax the definition of _left_ stream-table/globalTable and > _left/outer_ stream-stream join though, and not drop `null`-(join)key stream > records, and call the ValueJoiner with a `null` "other-side" value instead: > if the stream record key (or join-key) is `null`, we could treat is as > "failed lookup" instead of treating the stream record as corrupted. > If we make this change, users that want to keep the current behavior, can add > a `filter()` before the join to drop `null`-(join)key records from the stream > explicitly. > Note that this change also requires to change the behavior if we insert a > repartition topic before the join: currently, we drop `null`-key record > before writing into the repartition topic (as we know they would be dropped > later anyway). We need to relax this behavior for a left stream-table and > left/outer stream-stream join. User need to be aware (ie, we might need to > put this into the docs and JavaDocs), that records with `null`-key would be > partitioned randomly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation
[ https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402211#comment-17402211 ] Tommy Becker commented on KAFKA-13197: -- Hey [~guozhang] thanks for the response. To be clear, I'm looking at {{KStream.leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)}} as shown [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java#L2964] and similar signatures, which still have the (incorrect) verbiage I quoted. It seems from the churn in this area that there is a need to allow both null stream key/value records as well as non-null stream side records that map to null join keys,, though my use-case and this issue are specifically about the latter, which used to work prior to 2.7. > KStream-GlobalKTable join semantics don't match documentation > - > > Key: KAFKA-13197 > URL: https://issues.apache.org/jira/browse/KAFKA-13197 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Tommy Becker >Priority: Major > > As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was > changed. It appears the change was intended to merely relax a requirement but > it actually broke backwards compatibility. Although it does allow {{null}} > keys and values in the KStream to be joined, it now excludes {{null}} results > of the {{KeyValueMapper}}. We have an application which can return {{null}} > from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on > these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still > explicitly says this is done: > {quote}If a KStream input record key or value is null the record will not be > included in the join operation and thus no output record will be added to the > resulting KStream. > If keyValueMapper returns null implying no match exists, a null value will > be provided to ValueJoiner. > {quote} > Both these statements are incorrect. > I think the new behavior is worse than the previous/documented behavior. It > feels more reasonable to have a non-null stream record map to a null join key > (our use-case is event-enhancement where the incoming record doesn't have the > join field), than the reverse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation
Tommy Becker created KAFKA-13197: Summary: KStream-GlobalKTable join semantics don't match documentation Key: KAFKA-13197 URL: https://issues.apache.org/jira/browse/KAFKA-13197 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Tommy Becker As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was changed. It appears the change was intended to merely relax a requirement but it actually broke backwards compatibility. Although it does allow {{null}} keys and values in the KStream to be joined, it now excludes {{null}} results of the {{KeyValueMapper}}. We have an application which can return {{null}} from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still explicitly says this is done: {quote}If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoiner. {quote} Both these statements are incorrect. I think the new behavior is worse than the previous/documented behavior. It feels more reasonable to have a non-null stream record map to a null join key (our use-case is event-enhancement where the incoming record doesn't have the join field), than the reverse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167485#comment-17167485 ] Tommy Becker commented on KAFKA-10324: -- Understood but seems like as your fetch offset approaches the end of a segment you'll get a smaller batch. Just pointing out that scanning ahead to grab as many records as will fit into the max fetch size could be a general, albeit small improvement across the board. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167480#comment-17167480 ] Tommy Becker commented on KAFKA-10324: -- So is it always the case that records returned in a FetchResponse are all from a single segment? Might that not be inefficient? > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167431#comment-17167431 ] Tommy Becker commented on KAFKA-10324: -- Yes. See my comment above. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400 ] Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:54 PM: [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true \| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] \| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] \| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] \| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] \| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] \| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here was (Author: twbecker): [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400 ] Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM: [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here was (Author: twbecker): [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167400#comment-17167400 ] Tommy Becker commented on KAFKA-10324: -- [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167371#comment-17167371 ] Tommy Becker commented on KAFKA-10324: -- Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree it's odd that this has not been found before now. I have limited familiarity with the code base, so it's possible I'm missing something but I believe the issue is as I described. In my tests I'm trying to consume a 25GB topic and have found 2 distinct offsets which the consumer cannot advance beyond, and they are both cases where the offset: # Is the lastOffset in the last batch of its log segment # Does not actually exist, presumably due to log compaction. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167357#comment-17167357 ] Tommy Becker commented on KAFKA-10324: -- We have some legacy applications whose consumer versions are not easily upgraded hitting this issue, and it's hard to diagnose since the consumers do not give a proper message (or indeed any message in the case of the 0.10.1.0 consumer) and since it is dependent on the way messages are batched, which is opaque to clients. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
Tommy Becker created KAFKA-10324: Summary: Pre-0.11 consumers can get stuck when messages are downconverted from V2 format Key: KAFKA-10324 URL: https://issues.apache.org/jira/browse/KAFKA-10324 Project: Kafka Issue Type: Bug Reporter: Tommy Becker As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset even if that offset gets removed due to log compaction. If a pre-0.11 consumer seeks to such an offset and issues a fetch, it will get an empty batch, since offsets prior to the requested one are filtered out during down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, but this leaves old consumers unable to consume these topics. The exact behavior varies depending on consumer version. The 0.10.0.0 consumer throws RecordTooLargeException and dies, believing that the record must not have been returned because it was too large. The 0.10.1.0 consumer simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983797#comment-16983797 ] Tommy Becker edited comment on KAFKA-7663 at 11/27/19 6:25 PM: --- Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added after this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. was (Author: twbecker): Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added before this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983797#comment-16983797 ] Tommy Becker commented on KAFKA-7663: - Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added before this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart
[ https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798573#comment-16798573 ] Tommy Becker commented on KAFKA-8143: - I think this is possibly due to https://issues.apache.org/jira/browse/KAFKA-7663, which renders addGlobalStore() useless for its intended purpose. > Kafka-Streams GlobalStore cannot be read after application restart > -- > > Key: KAFKA-8143 > URL: https://issues.apache.org/jira/browse/KAFKA-8143 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Luke Stephenson >Priority: Major > > I've created a small example application which has a trivial `Processor` > which takes messages and stores the length of the String value rather than > the value itself. > That is, the following setup: > {code:java} > Topic[String, String] > Processor[String, String] > KeyValueStore[String, Long] // Note the Store persists Long values > {code} > > The example application also has a Thread which periodically displays all > values in the KeyValueStore. > While the application is run, I can publish values to the topic with: > {code:java} > root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property > "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 > --topic test.topic > >1:hello > >2:abc{code} > And the background Thread will report the values persisted to the key value > store. > If the application is restarted, when attempting to read from the > KeyValueStore it will fail. It attempts to recover the state from the > persistent RocksDB store which fails with: > {code:java} > org.apache.kafka.common.errors.SerializationException: Size of data received > by LongDeserializer is not 8{code} > (Note there is no stack trace as SerializationException has disabled it.) > Debugging appears to reveal that the original data from the Topic is being > restored rather than what was modified by the processor. > I've created a minimal example to show the issue at > [https://github.com/lukestephenson/kafka-streams-example] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7872) Consider allowing transformations on GlobalKTable before materialization
[ https://issues.apache.org/jira/browse/KAFKA-7872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753989#comment-16753989 ] Tommy Becker commented on KAFKA-7872: - [~mjsax] Yes I believe it is. > Consider allowing transformations on GlobalKTable before materialization > > > Key: KAFKA-7872 > URL: https://issues.apache.org/jira/browse/KAFKA-7872 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Current GlobalKTable's interface does not allow any transformations on itself: > {code} > public interface GlobalKTable { > /** > * Get the name of the local state store that can be used to query this > {@code GlobalKTable}. > * > * @return the underlying state store name, or {@code null} if this > {@code GlobalKTable} cannot be queried. > */ > String queryableStoreName(); > } > {code} > This limits use cases such that users want to read the source topic, and do > some filtering / value transformations / reformatting etc before materialize > it to the backing state stores for further operations like joins. On the > other hand, for KTable today we already allow the source KTable to be > filtered / value-mapped etc which would, behind the scene, apply those > applications on the fly before materializing to the state stores. > We should consider adding such functionalities for GlobalKTable as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7397) Ability to apply DSL stateless transformation on a global table
[ https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753988#comment-16753988 ] Tommy Becker commented on KAFKA-7397: - It's also worth noting that using the given workaround, where you have a separate streams app pre-processing the global table backing topic, you effectively lose the ability to fully bootstrap the table before proceeding with your main processing logic. > Ability to apply DSL stateless transformation on a global table > --- > > Key: KAFKA-7397 > URL: https://issues.apache.org/jira/browse/KAFKA-7397 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Frederic Tardif >Priority: Major > Labels: needs-kip > Attachments: kafka.zip > > > When consuming a globalKTable (with the expectation of caching all the data > of a topic in a consumer store), we can't apply any stateless transformation > (filter, map), prior to materializing. To achieve this, while ensuring to > consume the records of all the partitions, we must first run a stream app > that does preprocessing on the ingress topic into an exact K1,V1 egress topic > as we want to store in our GlobalKTable. This looks unnecessarily complex, > and causes to double the storage of the topic, while the only goal is to > adapt statelessly the data prior to storing (rockDB) at the receiving end. > See discussion on > :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic] > As a workaround, I have used `new Builder().addGlobalStore()` with a > Custom Processor able to filter and map prior to store (see attached). > Although this seem to work, I believe this functionality should be part of > the basic dsl api when working with a globalTable (`new > StreamsBuilder().globalTable().filter(...).map()... `). > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753984#comment-16753984 ] Tommy Becker commented on KAFKA-7663: - I hit this as well; it really makes {{Topology#addGlobalStore()}} close to useless since presumably the only reason you are using a custom processor to maintain the store is because you do not want the default behavior of simply materializing the topic as-is. In this particular instance I was able to work around this by iterating the store in the custom processor's {{init()}} method and making some changes, though it's quite nasty. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > > this is a follow up on stackoverflow discussion around storing a K,V topic as > a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7075) Allow Topology#addGlobalStore to add a window store
[ https://issues.apache.org/jira/browse/KAFKA-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745164#comment-16745164 ] Tommy Becker commented on KAFKA-7075: - [~lmontrieux] I'm curious what your use-case is. After experimenting with addGlobalStore(), I'm not sure what value it has. At first glance it seems to allow you to plug in a custom processor that allows you to maintain your global store however you wish. But the initial population of the store does not go through this processor, so if you want to do anything beyond loading the exact contents of the topic, you're out of luck. This method actually seems dangerous because if your custom processor does not use the keys from the topic verbatim, you'll get all manner of weird behavior. > Allow Topology#addGlobalStore to add a window store > --- > > Key: KAFKA-7075 > URL: https://issues.apache.org/jira/browse/KAFKA-7075 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Nishanth Pradeep >Priority: Major > Labels: newbie > > Today although {{Topology#addGlobalStore}} can take any {{StateStore}} types, > the internal implementation {{InternalTopologyBuilder#addGlobalStore}} only > accepts {{StoreBuilder}}. It means if users pass in a windowed > store builder in {{Topology#addGlobalStore}} it will cause a runtime > ClassCastException. > We should fix this issue by relaxing the {{InternalTopologyBuilder}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7075) Allow Topology#addGlobalStore to add a window store
[ https://issues.apache.org/jira/browse/KAFKA-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743104#comment-16743104 ] Tommy Becker commented on KAFKA-7075: - Any update on this? Would be great to get this in. > Allow Topology#addGlobalStore to add a window store > --- > > Key: KAFKA-7075 > URL: https://issues.apache.org/jira/browse/KAFKA-7075 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Nishanth Pradeep >Priority: Major > Labels: newbie > > Today although {{Topology#addGlobalStore}} can take any {{StateStore}} types, > the internal implementation {{InternalTopologyBuilder#addGlobalStore}} only > accepts {{StoreBuilder}}. It means if users pass in a windowed > store builder in {{Topology#addGlobalStore}} it will cause a runtime > ClassCastException. > We should fix this issue by relaxing the {{InternalTopologyBuilder}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7700) AbstractConfig does not honor Properties defaults
[ https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tommy Becker reassigned KAFKA-7700: --- Assignee: (was: Tommy Becker) > AbstractConfig does not honor Properties defaults > - > > Key: KAFKA-7700 > URL: https://issues.apache.org/jira/browse/KAFKA-7700 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 2.1.0 >Reporter: Tommy Becker >Priority: Minor > > Kafka clients such as the Consumer and Producer require various configuration > parameters to work. In the case of the Consumer and Producer, these > parameters are provided by passing either a {{Map}} or > {{Properties}} instance to the respective constructor. > {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value > above {{Map}} other than the ability to wrap another > {{Properties}} instance that provides defaults. But Kafka negates this > benefit by treating the {{Properties}} instance as a {{Map}}, which only > works due to an unfortunate decision to have {{Properties}} extend > {{Hashtable}}. Such treatment bypasses the defaults because they are only > consulted by {{Properties.getProperty()}}. The net result is that when > creating Kafka clients via {{Properties}}, any configuration from its > defaults is ignored. > This has been reported several times over the years as KAFKA-1909, > KAFKA-2184, KAFKA-3049, and KAFKA-5514. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7700) AbstractConfig does not honor Properties defaults
[ https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707618#comment-16707618 ] Tommy Becker commented on KAFKA-7700: - One option here would be to deprecate the use of {{Properties}} for configuration these classes, though such an approach would require a KIP. Alternatively we could probably add support for defaults in {{AbstractConfig}}. > AbstractConfig does not honor Properties defaults > - > > Key: KAFKA-7700 > URL: https://issues.apache.org/jira/browse/KAFKA-7700 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 2.1.0 >Reporter: Tommy Becker >Assignee: Tommy Becker >Priority: Minor > > Kafka clients such as the Consumer and Producer require various configuration > parameters to work. In the case of the Consumer and Producer, these > parameters are provided by passing either a {{Map}} or > {{Properties}} instance to the respective constructor. > {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value > above {{Map}} other than the ability to wrap another > {{Properties}} instance that provides defaults. But Kafka negates this > benefit by treating the {{Properties}} instance as a {{Map}}, which only > works due to an unfortunate decision to have {{Properties}} extend > {{Hashtable}}. Such treatment bypasses the defaults because they are only > consulted by {{Properties.getProperty()}}. The net result is that when > creating Kafka clients via {{Properties}}, any configuration from its > defaults is ignored. > This has been reported several times over the years as KAFKA-1909, > KAFKA-2184, KAFKA-3049, and KAFKA-5514. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7700) AbstractConfig does not honor Properties defaults
Tommy Becker created KAFKA-7700: --- Summary: AbstractConfig does not honor Properties defaults Key: KAFKA-7700 URL: https://issues.apache.org/jira/browse/KAFKA-7700 Project: Kafka Issue Type: Bug Components: config Affects Versions: 2.1.0 Reporter: Tommy Becker Assignee: Tommy Becker Kafka clients such as the Consumer and Producer require various configuration parameters to work. In the case of the Consumer and Producer, these parameters are provided by passing either a {{Map}} or {{Properties}} instance to the respective constructor. {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value above {{Map}} other than the ability to wrap another {{Properties}} instance that provides defaults. But Kafka negates this benefit by treating the {{Properties}} instance as a {{Map}}, which only works due to an unfortunate decision to have {{Properties}} extend {{Hashtable}}. Such treatment bypasses the defaults because they are only consulted by {{Properties.getProperty()}}. The net result is that when creating Kafka clients via {{Properties}}, any configuration from its defaults is ignored. This has been reported several times over the years as KAFKA-1909, KAFKA-2184, KAFKA-3049, and KAFKA-5514. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535220#comment-16535220 ] Tommy Becker commented on KAFKA-4113: - [~mjsax] has any thought been given to making the strategy for choosing which topics to process from pluggable? I feel like the current timestamp behavior is one such strategy, but for some other use-cases I feel that a simple topic-level prioritization would be sufficient. For example, in the case where the table backing topic receives way less traffic than the stream topic, I think it could be reasonable to always prefer messages from the table topic over the stream topic. Such a scheme could work for a lot of cases and is quite a bit easier to reason about and implement. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259 ] Tommy Becker edited comment on KAFKA-4113 at 7/5/18 11:09 PM: -- Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics with existing data, 1 stream and 1 table and write a KafkaStreams application to do a join, it seems very likely that initial records in the result (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. was (Author: twbecker): Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics with existing data, 1 stream and 1 table and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259 ] Tommy Becker edited comment on KAFKA-4113 at 7/5/18 11:04 PM: -- Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics with existing data, 1 stream and 1 table and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. was (Author: twbecker): Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics, 1 stream and 1 table with existing data and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259 ] Tommy Becker commented on KAFKA-4113: - Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics, 1 stream and 1 table with existing data and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read. The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6259) Make KafkaStreams.cleanup() clean global state directory
[ https://issues.apache.org/jira/browse/KAFKA-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262987#comment-16262987 ] Tommy Becker commented on KAFKA-6259: - I literally just noticed this yesterday, and was wondering if it was a deliberate omission. > Make KafkaStreams.cleanup() clean global state directory > > > Key: KAFKA-6259 > URL: https://issues.apache.org/jira/browse/KAFKA-6259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2, 1.0, 1.0.1 >Reporter: Damian Guy > > We have {{KafkaStreams#cleanUp}} so that developers can remove all local > state during development, i.e., so they can start from a clean slate. > However, this presently doesn't cleanup the global state directory -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6069) Streams metrics tagged incorrectly
Tommy Becker created KAFKA-6069: --- Summary: Streams metrics tagged incorrectly Key: KAFKA-6069 URL: https://issues.apache.org/jira/browse/KAFKA-6069 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Tommy Becker Assignee: Tommy Becker Priority: Minor KafkaStreams attempts to tag many (all?) of it's metrics with the client id. But instead of retrieving the value from the config, it tags them with the literal "client.id", as can be seen on org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java:114 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188552#comment-16188552 ] Tommy Becker commented on KAFKA-5886: - [~sutambe] yes that sounds great which is why I was hoping this patch would get in ;) The write up on the KIP page was very helpful and informative. We're struggling a bit to figure out how best to work around this issue currently though. Obviously increasing {{request.timeout.ms}} even more is one option, but 12 already feels quite hight; I'm surprised that is not enough to produce to a 3 node cluster with {{acks = all}}. > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe > Fix For: 1.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16187976#comment-16187976 ] Tommy Becker commented on KAFKA-5886: - Thanks for the response. Though the KIP may be an improvement, I think this behavior where sends to a perfectly functioning cluster are dropped is clearly a bug, and I'd argue a major one. We have a use case where we are reading from a local file and producing the records to Kafka and even with {{request.timeout.ms = 12}} and {{max.in.flight.requests.per.connnection = 1}} it is still dropping records. > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe > Fix For: 1.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186303#comment-16186303 ] Tommy Becker edited comment on KAFKA-5886 at 9/29/17 7:51 PM: -- Is this still on track for 1.0? We just hit an issue with records expiring in the accumulator when the producer is heavily loaded. The current behavior of expiring batches even on partitions that are making progress is quite unintuitive, as is the current workaround. was (Author: twbecker): Is this still on track for 1.0? We just hit an issue with records expiring in the accumulator when the producer is heavily loaded. The current behavior of expiring batches even on partitions that are making progress is quite unintuitive, as is the current fix. > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe > Fix For: 1.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186303#comment-16186303 ] Tommy Becker commented on KAFKA-5886: - Is this still on track for 1.0? We just hit an issue with records expiring in the accumulator when the producer is heavily loaded. The current behavior of expiring batches even on partitions that are making progress is quite unintuitive, as is the current fix. > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe > Fix For: 1.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127637#comment-16127637 ] Tommy Becker edited comment on KAFKA-5440 at 8/15/17 5:56 PM: -- To answer my own question, from what I can tell the PR for KAFKA-5372 will solve this. It would be nice to get a backport to a patch release before 1.0. was (Author: twbecker): To answer my own question, from what I can tell the PR for KAFKA-5372 will solve this. It would be nice to get a backport to patch release before 1.0. > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127637#comment-16127637 ] Tommy Becker commented on KAFKA-5440: - To answer my own question, from what I can tell the PR for KAFKA-5372 will solve this. It would be nice to get a backport to patch release before 1.0. > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127316#comment-16127316 ] Tommy Becker edited comment on KAFKA-5440 at 8/15/17 2:41 PM: -- Any update on this? We're seeing the same behavior and I though there have been some changes in this area in trunk I don't see anything that would fix this particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener which is called back on the state changes of the thread still doesn't seem to care about transitions to terminal states. This is quite frustrating because we too were using the KafkaStreams state to determine the health of the topology. was (Author: twbecker): Any update on this? We're seeing the same behavior and I though there have been some changes in this area in trunk I don't see anything that would fix this particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener which is called back on the state changes of the thread still doesn't seem to care about transitions to terminal states. > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127316#comment-16127316 ] Tommy Becker commented on KAFKA-5440: - Any update on this? We're seeing the same behavior and I though there have been some changes in this area in trunk I don't see anything that would fix this particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener which is called back on the state changes of the thread still doesn't seem to care about transitions to terminal states. > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5379) ProcessorContext.appConfigs() should return parsed/validated values
[ https://issues.apache.org/jira/browse/KAFKA-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066393#comment-16066393 ] Tommy Becker commented on KAFKA-5379: - Hey [~guozhang], I should have a PR ready pretty soon, this just slipped off my radar. One thing that slightly complicates this change is that currently, users can put arbitrary values in the config map at topology build time, and later retrieve them via {{ProcessorContext.appConfigs()}}. If we change that to return {{StreamsConfig.values()}}, this will no longer be possible. So I changed {{ProcessorContext.appConfigs()}} to return a Map that has the parsed values overlaid on the originals. So it contains the parsed values for known configs but other unknown values are still there. > ProcessorContext.appConfigs() should return parsed/validated values > --- > > Key: KAFKA-5379 > URL: https://issues.apache.org/jira/browse/KAFKA-5379 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Tommy Becker >Assignee: Tommy Becker >Priority: Minor > > As part of KAFKA-5334, it was decided that the current behavior of > {{ProcessorContext.appConfigs()}} is sub-optimal in that it returns the > original unparsed config values. Alternatively, the parsed values could be > returned which would allow callers to know what they are getting as well > avoid duplicating type conversions (e.g. className -> class). -- This message was sent by Atlassian JIRA (v6.4.14#64029)