[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817385#comment-17817385
 ] 

Tommy Becker commented on KAFKA-13505:
--

I'm wondering if this bug is being overlooked because it's described in terms 
of third party libraries like Confluent's S3 connector and Avro. But the issue 
is actually in the SchemaProjector class, which is part of Kafka Connect 
itself. Confluent's AvroConverter represents Avro enums in Connect Schema as 
Strings with "parameters" corresponding to each enum value. Even though these 
parameters seem like they are just intended to be metadata, 
SchemaProjector.checkMaybeCompatible() requires the parameters of each field to 
*exactly* match, so when values are added to an Avro enum, new parameters are 
added to the generated Connect Schema, breaking this test. Intuitively, it 
feels like this check could be less strict, but there is no specification for 
Connect Schema that I can find, so I don't know if requiring this parameter 
equality is correct or not.

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in 

[jira] [Commented] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method

2023-12-19 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798636#comment-17798636
 ] 

Tommy Becker commented on KAFKA-16018:
--

I think that's true, but wasn't sure on the timing of 4.0.

> KafkaStreams can go into a zombie state if UncaughtExceptionHandler is 
> specified via the deprecated method
> --
>
> Key: KAFKA-16018
> URL: https://issues.apache.org/jira/browse/KAFKA-16018
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Tommy Becker
>Priority: Major
>
> We have a streams application in which all StreamThreads died due to a lack 
> of disk space. To our surprise, the KafkaStreams instance still reported its 
> state as running. Upon further investigation, it appears this is due to the 
> application setting an UncaughtExceptionHandler via the deprecated method 
> (this application was recently upgraded from 2.4.1): 
> [https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)]
> The only way a StreamThread failure can cause the KafkaStreams instance to 
> transition to an error state now is via the new 
> StreamsUncaughtExceptionHandler machinery, but when an old 
> UncaughtExceptionHandler is set by the old method this code is effectively 
> bypassed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method

2023-12-15 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-16018:


 Summary: KafkaStreams can go into a zombie state if 
UncaughtExceptionHandler is specified via the deprecated method
 Key: KAFKA-16018
 URL: https://issues.apache.org/jira/browse/KAFKA-16018
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Tommy Becker


We have a streams application in which all StreamThreads died due to a lack of 
disk space. To our surprise, the KafkaStreams instance still reported its state 
as running. Upon further investigation, it appears this is due to the 
application setting an UncaughtExceptionHandler via the deprecated method (this 
application was recently upgraded from 2.4.1): 
[https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)]

The only way a StreamThread failure can cause the KafkaStreams instance to 
transition to an error state now is via the new StreamsUncaughtExceptionHandler 
machinery, but when an old UncaughtExceptionHandler is set by the old method 
this code is effectively bypassed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2021-08-20 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402213#comment-17402213
 ] 

Tommy Becker commented on KAFKA-12317:
--

At least in the case of left stream-globalTable joins, this is how it used to 
work prior to 2.7/KAFKA-10277. I agree that it would be easier to reason about 
if these operations did not drop any records and relied on filter operations 
instead.

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2021-08-20 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402211#comment-17402211
 ] 

Tommy Becker commented on KAFKA-13197:
--

Hey [~guozhang] thanks for the response. To be clear, I'm looking at 
{{KStream.leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)}} as shown 
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java#L2964]
 and similar signatures, which still have the (incorrect) verbiage I quoted.

It seems from the churn in this area that there is a need to allow both null 
stream key/value records as well as non-null stream side records that map to 
null join keys,, though my use-case and this issue are specifically about the 
latter, which used to work prior to 2.7.

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Priority: Major
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2021-08-12 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-13197:


 Summary: KStream-GlobalKTable join semantics don't match 
documentation
 Key: KAFKA-13197
 URL: https://issues.apache.org/jira/browse/KAFKA-13197
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Tommy Becker


As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was changed. 
It appears the change was intended to merely relax a requirement but it 
actually broke backwards compatibility. Although it does allow {{null}} keys 
and values in the KStream to be joined, it now excludes {{null}} results of the 
{{KeyValueMapper}}. We have an application which can return {{null}} from the 
{{KeyValueMapper}} for non-null keys in the KStream, and relies on these nulls 
being passed to the {{ValueJoiner}}. Indeed the javadoc still explicitly says 
this is done:
{quote}If a KStream input record key or value is null the record will not be 
included in the join operation and thus no output record will be added to the 
resulting KStream.
 If keyValueMapper returns null implying no match exists, a null value will be 
provided to ValueJoiner.
{quote}
Both these statements are incorrect.

I think the new behavior is worse than the previous/documented behavior. It 
feels more reasonable to have a non-null stream record map to a null join key 
(our use-case is event-enhancement where the incoming record doesn't have the 
join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167485#comment-17167485
 ] 

Tommy Becker commented on KAFKA-10324:
--

Understood but seems like as your fetch offset approaches the end of a segment 
you'll get a smaller batch. Just pointing out that scanning ahead to grab as 
many records as will fit into the max fetch size could be a general, albeit 
small improvement across the board.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167480#comment-17167480
 ] 

Tommy Becker commented on KAFKA-10324:
--

So is it always the case that records returned in a FetchResponse are all from 
a single segment? Might that not be inefficient?

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167431#comment-17167431
 ] 

Tommy Becker commented on KAFKA-10324:
--

Yes. See my comment above.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:54 PM:


[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
\| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
\| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
\| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
\| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
\| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
\| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 


was (Author: twbecker):
[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM:


[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 


was (Author: twbecker):
[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker commented on KAFKA-10324:
--

[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167371#comment-17167371
 ] 

Tommy Becker commented on KAFKA-10324:
--

Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree 
it's odd that this has not been found before now. I have limited familiarity 
with the code base, so it's possible I'm missing something but I believe the 
issue is as I described. In my tests I'm trying to consume a 25GB topic and 
have found 2 distinct offsets which the consumer cannot advance beyond, and 
they are both cases where the offset:
 # Is the lastOffset in the last batch of its log segment
 # Does not actually exist, presumably due to log compaction.

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167357#comment-17167357
 ] 

Tommy Becker commented on KAFKA-10324:
--

We have some legacy applications whose consumer versions are not easily 
upgraded hitting this issue, and it's hard to diagnose since the consumers do 
not give a proper message (or indeed any message in the case of the 0.10.1.0 
consumer) and since it is dependent on the way messages are batched, which is 
opaque to clients.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-10324:


 Summary: Pre-0.11 consumers can get stuck when messages are 
downconverted from V2 format
 Key: KAFKA-10324
 URL: https://issues.apache.org/jira/browse/KAFKA-10324
 Project: Kafka
  Issue Type: Bug
Reporter: Tommy Becker


As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
even if that offset gets removed due to log compaction. If a pre-0.11 consumer 
seeks to such an offset and issues a fetch, it will get an empty batch, since 
offsets prior to the requested one are filtered out during down-conversion. 
KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, 
but this leaves old consumers unable to consume these topics.

The exact behavior varies depending on consumer version. The 0.10.0.0 consumer 
throws RecordTooLargeException and dies, believing that the record must not 
have been returned because it was too large. The 0.10.1.0 consumer simply spins 
fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

2019-11-27 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983797#comment-16983797
 ] 

Tommy Becker edited comment on KAFKA-7663 at 11/27/19 6:25 PM:
---

Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure 
that javadoc was added after this bug was opened ;)  This is a pretty important 
use-case for us. The topic we build the global store from is heterogeneous; we 
only need some types of records and need to rekey others prior to joining to 
our event stream. Personally, I feel like your second option (i.e. maintaining 
a real changelog topic) is probably overkill and simply running the records 
through the processors during restoration is adequate. Seems like we could even 
keep the current "fast path" restoration that bypasses serde when there is no 
custom processor configured.

With respect to your recommended work-around, the problem with that is that if 
I'm not mistaken you lose the critical property of the global store being fully 
populated when the rest of the topology starts up. Because although the store 
will wait for "my-global-changelog" topic to be caught up, that doesn't mean 
much since that topic is itself fed from the main topology.


was (Author: twbecker):
Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure 
that javadoc was added before this bug was opened ;)  This is a pretty 
important use-case for us. The topic we build the global store from is 
heterogeneous; we only need some types of records and need to rekey others 
prior to joining to our event stream. Personally, I feel like your second 
option (i.e. maintaining a real changelog topic) is probably overkill and 
simply running the records through the processors during restoration is 
adequate. Seems like we could even keep the current "fast path" restoration 
that bypasses serde when there is no custom processor configured.

With respect to your recommended work-around, the problem with that is that if 
I'm not mistaken you lose the critical property of the global store being fully 
populated when the rest of the topology starts up. Because although the store 
will wait for "my-global-changelog" topic to be caught up, that doesn't mean 
much since that topic is itself fed from the main topology.

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---
>
> Key: KAFKA-7663
> URL: https://issues.apache.org/jira/browse/KAFKA-7663
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Tardif
>Priority: Major
> Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

2019-11-27 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983797#comment-16983797
 ] 

Tommy Becker commented on KAFKA-7663:
-

Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure 
that javadoc was added before this bug was opened ;)  This is a pretty 
important use-case for us. The topic we build the global store from is 
heterogeneous; we only need some types of records and need to rekey others 
prior to joining to our event stream. Personally, I feel like your second 
option (i.e. maintaining a real changelog topic) is probably overkill and 
simply running the records through the processors during restoration is 
adequate. Seems like we could even keep the current "fast path" restoration 
that bypasses serde when there is no custom processor configured.

With respect to your recommended work-around, the problem with that is that if 
I'm not mistaken you lose the critical property of the global store being fully 
populated when the rest of the topology starts up. Because although the store 
will wait for "my-global-changelog" topic to be caught up, that doesn't mean 
much since that topic is itself fed from the main topology.

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---
>
> Key: KAFKA-7663
> URL: https://issues.apache.org/jira/browse/KAFKA-7663
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Tardif
>Priority: Major
> Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart

2019-03-21 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798573#comment-16798573
 ] 

Tommy Becker commented on KAFKA-8143:
-

I think this is possibly due to 
https://issues.apache.org/jira/browse/KAFKA-7663, which renders 
addGlobalStore() useless for its intended purpose.

> Kafka-Streams GlobalStore cannot be read after application restart
> --
>
> Key: KAFKA-8143
> URL: https://issues.apache.org/jira/browse/KAFKA-8143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Luke Stephenson
>Priority: Major
>
> I've created a small example application which has a trivial `Processor` 
> which takes messages and stores the length of the String value rather than 
> the value itself.
> That is, the following setup:
> {code:java}
> Topic[String, String]
> Processor[String, String]
> KeyValueStore[String, Long] // Note the Store persists Long values
> {code}
>  
> The example application also has a Thread which periodically displays all 
> values in the KeyValueStore.
> While the application is run, I can publish values to the topic with:
> {code:java}
> root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
> "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
> --topic test.topic
> >1:hello
> >2:abc{code}
> And the background Thread will report the values persisted to the key value 
> store.
> If the application is restarted, when attempting to read from the 
> KeyValueStore it will fail.  It attempts to recover the state from the 
> persistent RocksDB store which fails with:
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by LongDeserializer is not 8{code}
> (Note there is no stack trace as SerializationException has disabled it.)
> Debugging appears to reveal that the original data from the Topic is being 
> restored rather than what was modified by the processor.
> I've created a minimal example to show the issue at 
> [https://github.com/lukestephenson/kafka-streams-example]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7872) Consider allowing transformations on GlobalKTable before materialization

2019-01-28 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16753989#comment-16753989
 ] 

Tommy Becker commented on KAFKA-7872:
-

[~mjsax] Yes I believe it is.

> Consider allowing transformations on GlobalKTable before materialization
> 
>
> Key: KAFKA-7872
> URL: https://issues.apache.org/jira/browse/KAFKA-7872
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Current GlobalKTable's interface does not allow any transformations on itself:
> {code}
> public interface GlobalKTable {
> /**
>  * Get the name of the local state store that can be used to query this 
> {@code GlobalKTable}.
>  *
>  * @return the underlying state store name, or {@code null} if this 
> {@code GlobalKTable} cannot be queried.
>  */
> String queryableStoreName();
> }
> {code}
> This limits use cases such that users want to read the source topic, and do 
> some filtering / value transformations / reformatting etc before materialize 
> it to the backing state stores for further operations like joins. On the 
> other hand, for KTable today we already allow the source KTable to be 
> filtered / value-mapped etc which would, behind the scene, apply those 
> applications on the fly before materializing to the state stores.
> We should consider adding such functionalities for GlobalKTable as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7397) Ability to apply DSL stateless transformation on a global table

2019-01-28 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16753988#comment-16753988
 ] 

Tommy Becker commented on KAFKA-7397:
-

It's also worth noting that using the given workaround, where you have a 
separate streams app pre-processing the global table backing topic, you 
effectively lose the ability to fully bootstrap the table before proceeding 
with your main processing logic.

> Ability to apply DSL stateless transformation on a global table
> ---
>
> Key: KAFKA-7397
> URL: https://issues.apache.org/jira/browse/KAFKA-7397
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Frederic Tardif
>Priority: Major
>  Labels: needs-kip
> Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data 
> of a topic in a consumer store), we can't apply any stateless transformation 
> (filter, map), prior to materializing. To achieve this, while ensuring to 
> consume the records of all the partitions, we must first run a stream app 
> that does preprocessing on the ingress topic into an exact K1,V1 egress topic 
> as we want to store in our GlobalKTable. This looks unnecessarily complex, 
> and causes to double the storage of the topic, while the only goal is to 
> adapt statelessly the data prior to storing (rockDB) at the receiving end.
> See discussion on 
> :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
> As a workaround, I have used `new Builder().addGlobalStore()` with a 
> Custom Processor able to filter and map prior to store (see attached). 
> Although this seem to work, I believe this functionality should be part of 
> the basic dsl api when working with a globalTable (`new 
> StreamsBuilder().globalTable().filter(...).map()... `).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

2019-01-28 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16753984#comment-16753984
 ] 

Tommy Becker commented on KAFKA-7663:
-

I hit this as well; it really makes {{Topology#addGlobalStore()}} close to 
useless since presumably the only reason you are using a custom processor to 
maintain the store is because you do not want the default behavior of simply 
materializing the topic as-is. In this particular instance I was able to work 
around this by iterating the store in the custom processor's {{init()}} method 
and making some changes, though it's quite nasty.

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---
>
> Key: KAFKA-7663
> URL: https://issues.apache.org/jira/browse/KAFKA-7663
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Tardif
>Priority: Major
> Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  
> this is a follow up on stackoverflow discussion around storing a K,V topic as 
> a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7075) Allow Topology#addGlobalStore to add a window store

2019-01-17 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745164#comment-16745164
 ] 

Tommy Becker commented on KAFKA-7075:
-

[~lmontrieux] I'm curious what your use-case is. After experimenting with 
addGlobalStore(), I'm not sure what value it has. At first glance it seems to 
allow you to plug in a custom processor that allows you to maintain your global 
store however you wish. But the initial population of the store does not go 
through this processor, so if you want to do anything beyond loading the exact 
contents of the topic, you're out of luck. This method actually seems dangerous 
because if your custom processor does not use the keys from the topic verbatim, 
you'll get all manner of weird behavior.

> Allow Topology#addGlobalStore to add a window store
> ---
>
> Key: KAFKA-7075
> URL: https://issues.apache.org/jira/browse/KAFKA-7075
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: newbie
>
> Today although {{Topology#addGlobalStore}} can take any {{StateStore}} types, 
> the internal implementation {{InternalTopologyBuilder#addGlobalStore}} only 
> accepts {{StoreBuilder}}. It means if users pass in a windowed 
> store builder in {{Topology#addGlobalStore}} it will cause a runtime 
> ClassCastException.
> We should fix this issue by relaxing the {{InternalTopologyBuilder}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7075) Allow Topology#addGlobalStore to add a window store

2019-01-15 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743104#comment-16743104
 ] 

Tommy Becker commented on KAFKA-7075:
-

Any update on this?  Would be great to get this in.

> Allow Topology#addGlobalStore to add a window store
> ---
>
> Key: KAFKA-7075
> URL: https://issues.apache.org/jira/browse/KAFKA-7075
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: newbie
>
> Today although {{Topology#addGlobalStore}} can take any {{StateStore}} types, 
> the internal implementation {{InternalTopologyBuilder#addGlobalStore}} only 
> accepts {{StoreBuilder}}. It means if users pass in a windowed 
> store builder in {{Topology#addGlobalStore}} it will cause a runtime 
> ClassCastException.
> We should fix this issue by relaxing the {{InternalTopologyBuilder}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tommy Becker reassigned KAFKA-7700:
---

Assignee: (was: Tommy Becker)

> AbstractConfig does not honor Properties defaults
> -
>
> Key: KAFKA-7700
> URL: https://issues.apache.org/jira/browse/KAFKA-7700
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.1.0
>Reporter: Tommy Becker
>Priority: Minor
>
> Kafka clients such as the Consumer and Producer require various configuration 
> parameters to work. In the case of the Consumer and Producer, these 
> parameters are provided by passing either a {{Map}} or 
> {{Properties}} instance to the respective constructor.
> {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value 
> above {{Map}} other than the ability to wrap another 
> {{Properties}} instance that provides defaults. But Kafka negates this 
> benefit by treating the {{Properties}} instance as a {{Map}}, which only 
> works due to an unfortunate decision to have {{Properties}} extend 
> {{Hashtable}}.  Such treatment bypasses the defaults because they are only 
> consulted by {{Properties.getProperty()}}. The net result is that when 
> creating Kafka clients via {{Properties}}, any configuration from its 
> defaults is ignored.
> This has been reported several times over the years as KAFKA-1909, 
> KAFKA-2184, KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707618#comment-16707618
 ] 

Tommy Becker commented on KAFKA-7700:
-

One option here would be to deprecate the use of {{Properties}} for 
configuration these classes, though such an approach would require a KIP. 
Alternatively we could probably add support for defaults in {{AbstractConfig}}.

> AbstractConfig does not honor Properties defaults
> -
>
> Key: KAFKA-7700
> URL: https://issues.apache.org/jira/browse/KAFKA-7700
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.1.0
>Reporter: Tommy Becker
>Assignee: Tommy Becker
>Priority: Minor
>
> Kafka clients such as the Consumer and Producer require various configuration 
> parameters to work. In the case of the Consumer and Producer, these 
> parameters are provided by passing either a {{Map}} or 
> {{Properties}} instance to the respective constructor.
> {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value 
> above {{Map}} other than the ability to wrap another 
> {{Properties}} instance that provides defaults. But Kafka negates this 
> benefit by treating the {{Properties}} instance as a {{Map}}, which only 
> works due to an unfortunate decision to have {{Properties}} extend 
> {{Hashtable}}.  Such treatment bypasses the defaults because they are only 
> consulted by {{Properties.getProperty()}}. The net result is that when 
> creating Kafka clients via {{Properties}}, any configuration from its 
> defaults is ignored.
> This has been reported several times over the years as KAFKA-1909, 
> KAFKA-2184, KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)
Tommy Becker created KAFKA-7700:
---

 Summary: AbstractConfig does not honor Properties defaults
 Key: KAFKA-7700
 URL: https://issues.apache.org/jira/browse/KAFKA-7700
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.1.0
Reporter: Tommy Becker
Assignee: Tommy Becker


Kafka clients such as the Consumer and Producer require various configuration 
parameters to work. In the case of the Consumer and Producer, these parameters 
are provided by passing either a {{Map}} or {{Properties}} instance 
to the respective constructor.

{{Properties}} is a legacy class (akin to {{Vector)}} that adds no value above 
{{Map}} other than the ability to wrap another {{Properties}} 
instance that provides defaults. But Kafka negates this benefit by treating the 
{{Properties}} instance as a {{Map}}, which only works due to an unfortunate 
decision to have {{Properties}} extend {{Hashtable}}.  Such treatment bypasses 
the defaults because they are only consulted by {{Properties.getProperty()}}. 
The net result is that when creating Kafka clients via {{Properties}}, any 
configuration from its defaults is ignored.

This has been reported several times over the years as KAFKA-1909, KAFKA-2184, 
KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-06 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535220#comment-16535220
 ] 

Tommy Becker commented on KAFKA-4113:
-

[~mjsax] has any thought been given to making the strategy for choosing which 
topics to process from pluggable? I feel like the current timestamp behavior is 
one such strategy, but for some other use-cases I feel that a simple 
topic-level prioritization would be sufficient. For example, in the case where 
the table backing topic receives way less traffic than the stream topic, I 
think it could be reasonable to always prefer messages from the table topic 
over the stream topic. Such a scheme could work for a lot of cases and is quite 
a bit easier to reason about and implement.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap

2018-07-05 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534259#comment-16534259
 ] 

Tommy Becker edited comment on KAFKA-4113 at 7/5/18 11:09 PM:
--

Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics with existing data, 1 stream and 1 table and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the result (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.


was (Author: twbecker):
Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics with existing data, 1 stream and 1 table and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the stream (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap

2018-07-05 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534259#comment-16534259
 ] 

Tommy Becker edited comment on KAFKA-4113 at 7/5/18 11:04 PM:
--

Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics with existing data, 1 stream and 1 table and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the stream (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.


was (Author: twbecker):
Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics, 1 stream and 1 table with existing data and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the stream (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-07-05 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534259#comment-16534259
 ] 

Tommy Becker commented on KAFKA-4113:
-

Coming from Samza, I find it very surprising that there is no way to do this. 
If I have 2 topics, 1 stream and 1 table with existing data and write a 
KafkaStreams application to do a join, it seems very likely that initial 
records in the stream (possible quite many) will not be joined properly, as the 
corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases 
where you'd consider the value that was current in the table at the time of 
some incoming message as "better" than the latest value (though I suspect they 
are a minority). But in reality, the table backing topic is almost certainly 
log-compacted which means you can't achieve these semantics regardless as these 
older values are now gone, and worse, the new values have newer timestamps 
which perpetuate the problem we're talking about.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6259) Make KafkaStreams.cleanup() clean global state directory

2017-11-22 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262987#comment-16262987
 ] 

Tommy Becker commented on KAFKA-6259:
-

I literally just noticed this yesterday, and was wondering if it was a 
deliberate omission. 

> Make KafkaStreams.cleanup() clean global state directory
> 
>
> Key: KAFKA-6259
> URL: https://issues.apache.org/jira/browse/KAFKA-6259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2, 1.0, 1.0.1
>Reporter: Damian Guy
>
> We have {{KafkaStreams#cleanUp}} so that developers can remove all local 
> state during development, i.e., so they can start from a clean slate. 
> However, this presently doesn't cleanup the global state directory



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6069) Streams metrics tagged incorrectly

2017-10-17 Thread Tommy Becker (JIRA)
Tommy Becker created KAFKA-6069:
---

 Summary: Streams metrics tagged incorrectly
 Key: KAFKA-6069
 URL: https://issues.apache.org/jira/browse/KAFKA-6069
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Tommy Becker
Assignee: Tommy Becker
Priority: Minor


KafkaStreams attempts to tag many (all?) of it's metrics with the client id. 
But instead of retrieving the value from the config, it tags them with the 
literal "client.id", as can be seen on 
org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java:114



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2017-10-02 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188552#comment-16188552
 ] 

Tommy Becker commented on KAFKA-5886:
-

[~sutambe] yes that sounds great which is why I was hoping this patch would get 
in ;)  The write up on the KIP page was very helpful and informative. We're 
struggling a bit to figure out how best to work around this issue currently 
though. Obviously increasing {{request.timeout.ms}} even more is one option, 
but 12 already feels quite hight; I'm surprised that is not enough to 
produce to a 3 node cluster with {{acks = all}}.

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 1.0.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2017-10-02 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187976#comment-16187976
 ] 

Tommy Becker commented on KAFKA-5886:
-

Thanks for the response. Though the KIP may be an improvement, I think this 
behavior where sends to a perfectly functioning cluster are dropped is clearly 
a bug, and I'd argue a major one. We have a use case where we are reading from 
a local file and producing the records to Kafka and even with 
{{request.timeout.ms = 12}} and {{max.in.flight.requests.per.connnection = 
1}} it is still dropping records.

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 1.0.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2017-09-29 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186303#comment-16186303
 ] 

Tommy Becker edited comment on KAFKA-5886 at 9/29/17 7:51 PM:
--

Is this still on track for 1.0? We just hit an issue with records expiring in 
the accumulator when the producer is heavily loaded. The current behavior of 
expiring batches even on partitions that are making progress is quite 
unintuitive, as is the current workaround.


was (Author: twbecker):
Is this still on track for 1.0? We just hit an issue with records expiring in 
the accumulator when the producer is heavily loaded. The current behavior of 
expiring batches even on partitions that are making progress is quite 
unintuitive, as is the current fix.

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 1.0.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2017-09-29 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186303#comment-16186303
 ] 

Tommy Becker commented on KAFKA-5886:
-

Is this still on track for 1.0? We just hit an issue with records expiring in 
the accumulator when the producer is heavily loaded. The current behavior of 
expiring batches even on partitions that are making progress is quite 
unintuitive, as is the current fix.

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 1.0.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead

2017-08-15 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127637#comment-16127637
 ] 

Tommy Becker edited comment on KAFKA-5440 at 8/15/17 5:56 PM:
--

To answer my own question, from what I can tell the PR for KAFKA-5372 will 
solve this. It would be nice to get a backport to a patch release before 1.0.


was (Author: twbecker):
To answer my own question, from what I can tell the PR for KAFKA-5372 will 
solve this. It would be nice to get a backport to patch release before 1.0.

> Kafka Streams report state RUNNING even if all threads are dead
> ---
>
> Key: KAFKA-5440
> URL: https://issues.apache.org/jira/browse/KAFKA-5440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Matthias J. Sax
>
> From the mailing list:
> {quote}
> Hi All,
> We recently implemented a health check for a Kafka Streams based application. 
> The health check is simply checking the state of Kafka Streams by calling 
> KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or 
> NOT_RUNNING states. 
> We truly appreciate having the possibility to easily check the state of Kafka 
> Streams but to our surprise we noticed that KafkaStreams.state() returns 
> RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING 
> state. Is this intended behaviour or is it a bug? Semantically it seems weird 
> to me that KafkaStreams would say it’s RUNNING when it is in fact not 
> consuming anything since all underlying working threads has crashed. 
> If this is intended behaviour I would appreciate an explanation of why that 
> is the case. Also in that case, how could I determine if the consumption from 
> Kafka hasn’t crashed? 
> If this is not intended behaviour, how fast could I expect it to be fixed? I 
> wouldn’t mind fixing it myself but I’m not sure if this is considered trivial 
> or big enough to require a JIRA. Also, if I would implement a fix I’d like 
> your input on what would be a reasonable solution. By just inspecting to code 
> I have an idea but I’m not sure I understand all the implication so I’d be 
> happy to hear your thoughts first. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead

2017-08-15 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127637#comment-16127637
 ] 

Tommy Becker commented on KAFKA-5440:
-

To answer my own question, from what I can tell the PR for KAFKA-5372 will 
solve this. It would be nice to get a backport to patch release before 1.0.

> Kafka Streams report state RUNNING even if all threads are dead
> ---
>
> Key: KAFKA-5440
> URL: https://issues.apache.org/jira/browse/KAFKA-5440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Matthias J. Sax
>
> From the mailing list:
> {quote}
> Hi All,
> We recently implemented a health check for a Kafka Streams based application. 
> The health check is simply checking the state of Kafka Streams by calling 
> KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or 
> NOT_RUNNING states. 
> We truly appreciate having the possibility to easily check the state of Kafka 
> Streams but to our surprise we noticed that KafkaStreams.state() returns 
> RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING 
> state. Is this intended behaviour or is it a bug? Semantically it seems weird 
> to me that KafkaStreams would say it’s RUNNING when it is in fact not 
> consuming anything since all underlying working threads has crashed. 
> If this is intended behaviour I would appreciate an explanation of why that 
> is the case. Also in that case, how could I determine if the consumption from 
> Kafka hasn’t crashed? 
> If this is not intended behaviour, how fast could I expect it to be fixed? I 
> wouldn’t mind fixing it myself but I’m not sure if this is considered trivial 
> or big enough to require a JIRA. Also, if I would implement a fix I’d like 
> your input on what would be a reasonable solution. By just inspecting to code 
> I have an idea but I’m not sure I understand all the implication so I’d be 
> happy to hear your thoughts first. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead

2017-08-15 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127316#comment-16127316
 ] 

Tommy Becker edited comment on KAFKA-5440 at 8/15/17 2:41 PM:
--

Any update on this? We're seeing the same behavior and I though there have been 
some changes in this area in trunk I don't see anything that would fix this 
particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener 
which is called back on the state changes of the thread still doesn't seem to 
care about transitions to terminal states. This is quite frustrating because we 
too were using the KafkaStreams state to determine the health of the topology.


was (Author: twbecker):
Any update on this? We're seeing the same behavior and I though there have been 
some changes in this area in trunk I don't see anything that would fix this 
particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener 
which is called back on the state changes of the thread still doesn't seem to 
care about transitions to terminal states.

> Kafka Streams report state RUNNING even if all threads are dead
> ---
>
> Key: KAFKA-5440
> URL: https://issues.apache.org/jira/browse/KAFKA-5440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Matthias J. Sax
>
> From the mailing list:
> {quote}
> Hi All,
> We recently implemented a health check for a Kafka Streams based application. 
> The health check is simply checking the state of Kafka Streams by calling 
> KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or 
> NOT_RUNNING states. 
> We truly appreciate having the possibility to easily check the state of Kafka 
> Streams but to our surprise we noticed that KafkaStreams.state() returns 
> RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING 
> state. Is this intended behaviour or is it a bug? Semantically it seems weird 
> to me that KafkaStreams would say it’s RUNNING when it is in fact not 
> consuming anything since all underlying working threads has crashed. 
> If this is intended behaviour I would appreciate an explanation of why that 
> is the case. Also in that case, how could I determine if the consumption from 
> Kafka hasn’t crashed? 
> If this is not intended behaviour, how fast could I expect it to be fixed? I 
> wouldn’t mind fixing it myself but I’m not sure if this is considered trivial 
> or big enough to require a JIRA. Also, if I would implement a fix I’d like 
> your input on what would be a reasonable solution. By just inspecting to code 
> I have an idea but I’m not sure I understand all the implication so I’d be 
> happy to hear your thoughts first. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead

2017-08-15 Thread Tommy Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127316#comment-16127316
 ] 

Tommy Becker commented on KAFKA-5440:
-

Any update on this? We're seeing the same behavior and I though there have been 
some changes in this area in trunk I don't see anything that would fix this 
particular problem. org.apache.kafka.streams.KafkaStreams.StreamStateListener 
which is called back on the state changes of the thread still doesn't seem to 
care about transitions to terminal states.

> Kafka Streams report state RUNNING even if all threads are dead
> ---
>
> Key: KAFKA-5440
> URL: https://issues.apache.org/jira/browse/KAFKA-5440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Matthias J. Sax
>
> From the mailing list:
> {quote}
> Hi All,
> We recently implemented a health check for a Kafka Streams based application. 
> The health check is simply checking the state of Kafka Streams by calling 
> KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or 
> NOT_RUNNING states. 
> We truly appreciate having the possibility to easily check the state of Kafka 
> Streams but to our surprise we noticed that KafkaStreams.state() returns 
> RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING 
> state. Is this intended behaviour or is it a bug? Semantically it seems weird 
> to me that KafkaStreams would say it’s RUNNING when it is in fact not 
> consuming anything since all underlying working threads has crashed. 
> If this is intended behaviour I would appreciate an explanation of why that 
> is the case. Also in that case, how could I determine if the consumption from 
> Kafka hasn’t crashed? 
> If this is not intended behaviour, how fast could I expect it to be fixed? I 
> wouldn’t mind fixing it myself but I’m not sure if this is considered trivial 
> or big enough to require a JIRA. Also, if I would implement a fix I’d like 
> your input on what would be a reasonable solution. By just inspecting to code 
> I have an idea but I’m not sure I understand all the implication so I’d be 
> happy to hear your thoughts first. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)