[jira] [Resolved] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4329.

Resolution: Fixed

> The order of the parameters for creating the ZkUtils object is reversed
> ---
>
> Key: KAFKA-4329
> URL: https://issues.apache.org/jira/browse/KAFKA-4329
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>Priority: Critical
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problems, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14398.
-
Resolution: Fixed

> Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
> 
>
> Key: KAFKA-14398
> URL: https://issues.apache.org/jira/browse/KAFKA-14398
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.4.0
>
>
> KRAFT is a replacement for ZK for storing metadata.
> We should validate that ACLs work with KRAFT for the supported authentication 
> mechanizms. 
> I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14398) Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14398:
-

> Update EndToEndAuthorizerTest.scala to test with ZK and KRAFT quorum servers
> 
>
> Key: KAFKA-14398
> URL: https://issues.apache.org/jira/browse/KAFKA-14398
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, unit tests
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.4.0
>
>
> KRAFT is a replacement for ZK for storing metadata.
> We should validate that ACLs work with KRAFT for the supported authentication 
> mechanizms. 
> I will update EndToEndAuthorizerTest.scala to test with ZK and KRAFT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13372) failed authentication due to: SSL handshake failed

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-13372:
-

> failed authentication due to: SSL handshake failed
> --
>
> Key: KAFKA-13372
> URL: https://issues.apache.org/jira/browse/KAFKA-13372
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.2.2
>Reporter: Maria Isabel Florez Rodriguez
>Priority: Major
>
> Hi everyone,
>  
> I have the next issue about authentication SCRAM + SSL. I’m using the CLI and 
> this is the version of my client (./kafka_2.13-2.8.1/bin/kafka-topics.sh). In 
> this example I will talk about list topics, but another operations (consumer, 
> producer) failed too.
>  
>  
> First, let me describe the current scenario:
>  
>  * I have 5 Kafka servers with 
>  * kafka-broker-0.mydomain.com
>  * kafka-broker-1.mydomain.com
>  * kafka-broker-2.mydomain.com
>  * kafka-broker-3.mydomain.com
>  * kafka-broker-4.mydomain.com
>  
>  * I have a DNS principal configured with Round Robin to IPs broker:
>  * kafka-broker-princial.mydomain.com (Round Robin)
>  
>  I have configured for each broker the next listeners (I'm using 3 ports):
> {quote}advertised.listeners=SASL_SSL://kafka-broker-0.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-0.mydomain.com:9093,PLAINTEXT://kafka-broker-0.mydomain.com:9092{quote}
>  * 9092 for PLAINTEXT
>  * 9093 for SASL_PLAINTEXT
>  * 9094 for SASL_SSL
>  
> My Kafka broker servers have the next config server.properties:
> {quote}advertised.listeners=SASL_SSL://kafka-broker-X.mydomain.com:9094,SASL_PLAINTEXT://kafka-broker-X.mydomain.com:9093,PLAINTEXT://kafka-broker-X.mydomain.com:9092
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> auto.create.topics.enable=false
> auto.leader.rebalance.enable=true
> background.threads=10
> broker.id=X
> broker.rack=us-east-1c
> compression.type=producer
> connections.max.idle.ms=270
> controlled.shutdown.enable=true
> delete.topic.enable=true
> host.name=localhost
> leader.imbalance.check.interval.seconds=300
> leader.imbalance.per.broker.percentage=10
> listeners=SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9093,PLAINTEXT://0.0.0.0:9092
> log.cleaner.enable=true
> log.dirs=/var/lib/kafka/log/data1,/var/lib/kafka/log/data2,/var/lib/kafka/log/data3
> log.retention.check.interval.ms=30
> log.retention.hours=336
> log.segment.bytes=1073741824
> message.max.bytes=112
> min.insync.replicas=2
> num.io.threads=8
> num.network.threads=3
> num.partitions=3
> num.recovery.threads.per.data.dir=1
> num.replica.fetchers=1
> offset.metadata.max.bytes=4096
> offsets.commit.timeout.ms=5000
> offsets.retention.minutes=129600
> offsets.topic.num.partitions=50
> offsets.topic.replication.factor=3
> port=9092
> queued.max.requests=500
> replica.fetch.min.bytes=1
> replica.fetch.wait.max.ms=500
> sasl.enabled.mechanisms=SCRAM-SHA-256,GSSAPI
> sasl.kerberos.service.name=x
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
> security.inter.broker.protocol=SASL_SSL
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> ssl.client.auth=required
> {{ssl.endpoint.identification.algorithm=""}}
> ssl.enabled.protocols=TLSv1.2
> ssl.key.password=
> ssl.keystore.location=/etc/ssl/default_keystore.jks
> ssl.keystore.password=
> ssl.truststore.location=/usr/lib/jvm/java-11-adoptopenjdk-hotspot/lib/security/cacerts
> ssl.truststore.password= 
> ssl.truststore.type=JKS
> super.users=User:x
> zookeeper.connect=kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com:2181,kafka-zk-X.mydomain.com
>  :2181,kafka-zk-X.mydomain.com:218/my-environment
> zookeeper.connection.timeout.ms=6000
> zookeeper.sasl.client=false{quote}
>  
>  
> I was trying the next things:
>  
>  * (/)*PLAINTEXT:* I can consume directly to broker to broker with port 
> *9092* (Using IP or dns broker) 
>  * (/)*PLAINTEXT:* I also can consume directly to DNS principal configured 
> with Round Robin  with port *9092* (Using DNS principal)
>  * (/)*SASL_SSL:* I can consume directly to broker to broker with port *9094* 
> (Using only dns broker due it needs to validate the certificate)
>  * (x)*SASL_SSL:* I cannot consume directly to DNS principal configured with 
> Round Robin with port *9094*
> The issue is: * *(x)SASL_SSL(x):* I cannot consume directly to DNS principal 
> configured with Round Robin with port *9

[jira] [Reopened] (KAFKA-4329) The order of the parameters for creating the ZkUtils object is reversed

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4329:


> The order of the parameters for creating the ZkUtils object is reversed
> ---
>
> Key: KAFKA-4329
> URL: https://issues.apache.org/jira/browse/KAFKA-4329
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>Priority: Critical
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problems, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-10704:
-

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=

[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10704.
-
Resolution: Fixed

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutEx

[jira] [Resolved] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10352.
-
Resolution: Fixed

> Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)
> -
>
> Key: KAFKA-10352
> URL: https://issues.apache.org/jira/browse/KAFKA-10352
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Seongbae Chang
>Priority: Critical
>
> One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. 
> And then, other brokers also was shut down because of similar causes.
>  
> Main cause of this problem is '*Error while reading checkpoint file 
> /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)*
> *java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/cleaner-offset-checkpoint*'
>  
> I haven't known why this error occurs and how to solve it. Please, give me 
> some answers or comments about it. Thank you.
> And I attached the content of log files such as kafkaServer.out, 
> log-cleaner.log
>  
> kafkaServer.out
> {code:java}
> [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 
> expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO 
> [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 
> milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 
> 19:56:48,080] ERROR Error while reading checkpoint file 
> /tmp/kafka-logs/cleaner-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/cleaner-offset-checkpoint at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>  at java.nio.file.Files.newByteChannel(Files.java:361) at 
> java.nio.file.Files.newByteChannel(Files.java:407) at 
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>  at java.nio.file.Files.newInputStream(Files.java:152) at 
> java.nio.file.Files.newBufferedReader(Files.java:2784) at 
> java.nio.file.Files.newBufferedReader(Files.java:2816) at 
> kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87)
>  at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at 
> kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61)
>  at 
> kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134)
>  at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at 
> scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at 
> scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at 
> scala.collection.immutable.List$.from(List.scala:617) at 
> scala.collection.immutable.List$.from(List.scala:611) at 
> scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at 
> scala.collection.immutable.Iterable$.from(Iterable.scala:35) at 
> scala.collection.immutable.Iterable$.from(Iterable.scala:32) at 
> scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at 
> scala.collection.IterableOps.flatMap(Iterable.scala:674) at 
> scala.collection.IterableOps.flatMap$(Iterable.scala:674) at 
> scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at 
> kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132)
>  at 
> kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171)
>  at 
> kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168)
>  at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at 
> kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) 
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 
> 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir 
> /tmp/kafka-logs (kafka.server.ReplicaManager)[2020-07-30 19:56:48,086] INFO 
> [ReplicaFetcherManager on broker 3] Remov

[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10704.
-
Resolution: Fixed

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutEx

[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-10704:
-

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=

[jira] [Assigned] (KAFKA-5870) Idempotent producer: a producerId reset causes undesirable behavior for inflight batches to other partitions

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5870:
--

Assignee: (was: Apurva Mehta)

> Idempotent producer: a producerId reset causes undesirable behavior for 
> inflight batches to other partitions
> 
>
> Key: KAFKA-5870
> URL: https://issues.apache.org/jira/browse/KAFKA-5870
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Priority: Major
>  Labels: exactly-once
>
> Currently, if we have to reset the producer id for any reason (for instance 
> if batches to a partition get expired, if we get an 
> {{OutOfOrderSequenceException}}, etc) we could cause batches to other 
> --healthy-- partitions to fail with a spurious 
> {{OutOfOrderSequenceException}}.
> This is detailed in this PR discussion: 
> https://github.com/apache/kafka/pull/3743#discussion_r137907630
> Ideally, we would want all inflight batches to be handled to completion 
> rather than potentially failing them prematurely. Further, since we want to 
> tighten up the semantics of the {{OutOfOrderSequenceException}}, at the very 
> least we should raise another exception in this case, because there is no 
> data loss on the broker when the client gives up. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-5543) We don't remove the LastStableOffsetLag metric when a partition is moved away from a broker

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5543:
--

Assignee: (was: Apurva Mehta)

> We don't remove the LastStableOffsetLag metric when a partition is moved away 
> from a broker
> ---
>
> Key: KAFKA-5543
> URL: https://issues.apache.org/jira/browse/KAFKA-5543
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Priority: Major
>
> Reported by [~junrao], we have a small leak where the `LastStableOffsetLag` 
> metric is not removed along with the other metrics in the 
> `Partition.removeMetrics` method. This could create a leak when partitions 
> are reassigned or a topic is deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-2939:
--

Assignee: (was: Mani Jindal)

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-5053) Send Error messages along with FindCoordinatorResponse

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5053:
--

Assignee: (was: Apurva Mehta)

> Send Error messages along with FindCoordinatorResponse
> --
>
> Key: KAFKA-5053
> URL: https://issues.apache.org/jira/browse/KAFKA-5053
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Apurva Mehta
>Priority: Major
>
> As mentioned in 
> https://github.com/apache/kafka/pull/2825#discussion_r110535855, currently it 
> is hard to debug issues when we get an error in the 
> `FindCoordinatorResponse`. It would help if the server populated the response 
> with a suitable message.
> With the transaction coordinator, this needs becomes more acute, so we should 
> try to emulate the CreateTopicsResponse and return a usable message along 
> with the error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14717) KafkaStreams can' get running if the rebalance happens before StreamThread gets shutdown completely

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14717.
-
Resolution: Fixed

> KafkaStreams can' get running if the rebalance happens before StreamThread 
> gets shutdown completely
> ---
>
> Key: KAFKA-14717
> URL: https://issues.apache.org/jira/browse/KAFKA-14717
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.5.0
>
>
> I noticed this issue when tracing KAFKA-7109
> StreamThread closes the consumer before changing state to DEAD. If the 
> partition rebalance happens quickly, the other StreamThreads can't change 
> KafkaStream state from REBALANCING to RUNNING since there is a 
> PENDING_SHUTDOWN StreamThread



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14717) KafkaStreams can' get running if the rebalance happens before StreamThread gets shutdown completely

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14717.
-
Resolution: Fixed

> KafkaStreams can' get running if the rebalance happens before StreamThread 
> gets shutdown completely
> ---
>
> Key: KAFKA-14717
> URL: https://issues.apache.org/jira/browse/KAFKA-14717
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.5.0
>
>
> I noticed this issue when tracing KAFKA-7109
> StreamThread closes the consumer before changing state to DEAD. If the 
> partition rebalance happens quickly, the other StreamThreads can't change 
> KafkaStream state from REBALANCING to RUNNING since there is a 
> PENDING_SHUTDOWN StreamThread



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4706) Unify StreamsKafkaClient instances

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4706.

Resolution: Fixed

> Unify StreamsKafkaClient instances
> --
>
> Key: KAFKA-4706
> URL: https://issues.apache.org/jira/browse/KAFKA-4706
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>        Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 1.1.0
>
>
> Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in 
> {{KafkaStreams}} and one in {{InternalTopicManager}}).
> We want to unify both such that only a single instance is used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14442.
-
Resolution: Fixed

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
> Fix For: 3.2.0
>
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14442:
-

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
> Fix For: 3.2.0
>
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-9250:


> Kafka streams stuck in rebalancing state after UnknownHostException
> ---
>
> Key: KAFKA-9250
> URL: https://issues.apache.org/jira/browse/KAFKA-9250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, network, streams
>Affects Versions: 2.2.0
>Reporter: Vijay
>Priority: Critical
>
> We are using kafka streams (2.2.0) application for reading messages from 
> source topic do some transformation and send to destination topic. 
> Application started fine and processed messages till it encountered 
> UnknownHostException, after which application is hung in rebalancing state 
> and not processing messages.
>  
> Below are the properties we have configured :
> application.id = *
> bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
> num.stream.threads=3
> replication.factor=3
> num.standby.replicas=1
> max.block.ms=8640
> acks=all
> auto.offset.reset=earliest
> processing.guarantee=exactly_once
>  
> Additional details.
> Number of brokers - 3
> Source topic partition count - 12 and replication factor of 3
> Destination topic partition count - 12 and replication factor of 3
> 4 instances of stream application are deployed in docker containers.
>  
> Below are the some of the logs :
> {noformat}
> [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
> o.a.k.clients.NetworkClient - [Consumer 
> clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
>  groupId=streams-example] Error connecting to node hostname1:port1 (id: 
> 2147438464 rack: null)
> java.net.UnknownHostException: hostname1 
> at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
> at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
>  
> at 
> org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
>  
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
>  
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  
> at 
> org.apache.kafka

[jira] [Resolved] (KAFKA-4706) Unify StreamsKafkaClient instances

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4706.

Resolution: Fixed

> Unify StreamsKafkaClient instances
> --
>
> Key: KAFKA-4706
> URL: https://issues.apache.org/jira/browse/KAFKA-4706
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>        Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 1.1.0
>
>
> Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in 
> {{KafkaStreams}} and one in {{InternalTopicManager}}).
> We want to unify both such that only a single instance is used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-3576) Unify KStream and KTable API

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3576:


> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Damian Guy
>Priority: Major
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-3576) Unify KStream and KTable API

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3576.

Resolution: Fixed

> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Damian Guy
>Priority: Major
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-13599:
-

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14442.
-
Resolution: Fixed

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
> Fix For: 3.2.0
>
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-3576) Unify KStream and KTable API

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3576:


> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Damian Guy
>Priority: Major
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-4706) Unify StreamsKafkaClient instances

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4706:


> Unify StreamsKafkaClient instances
> --
>
> Key: KAFKA-4706
> URL: https://issues.apache.org/jira/browse/KAFKA-4706
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>        Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 1.1.0
>
>
> Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in 
> {{KafkaStreams}} and one in {{InternalTopicManager}}).
> We want to unify both such that only a single instance is used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-3576) Unify KStream and KTable API

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3576.

Resolution: Fixed

> Unify KStream and KTable API
> 
>
> Key: KAFKA-3576
> URL: https://issues.apache.org/jira/browse/KAFKA-3576
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Damian Guy
>Priority: Major
>  Labels: api
> Fix For: 0.10.1.0
>
>
> For KTable aggregations, it has a pattern of 
> {{table.groupBy(...).aggregate(...)}}, and the data is repartitioned in an 
> inner topic based on the selected key in {{groupBy(...)}}.
> For KStream aggregations, though, it has a pattern of 
> {{stream.selectKey(...).through(...).aggregateByKey(...)}}. In other words, 
> users need to manually use a topic to repartition data, and the syntax is a 
> bit different with KTable as well.
> h2. Goal
> To have similar APIs for aggregations of KStream and KTable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13599.
-
Resolution: Fixed

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-4706) Unify StreamsKafkaClient instances

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4706:


> Unify StreamsKafkaClient instances
> --
>
> Key: KAFKA-4706
> URL: https://issues.apache.org/jira/browse/KAFKA-4706
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>        Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 1.1.0
>
>
> Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in 
> {{KafkaStreams}} and one in {{InternalTopicManager}}).
> We want to unify both such that only a single instance is used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13599.
-
Resolution: Fixed

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-13599:
-

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9250.

Resolution: Fixed

> Kafka streams stuck in rebalancing state after UnknownHostException
> ---
>
> Key: KAFKA-9250
> URL: https://issues.apache.org/jira/browse/KAFKA-9250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, network, streams
>Affects Versions: 2.2.0
>Reporter: Vijay
>Priority: Critical
>
> We are using kafka streams (2.2.0) application for reading messages from 
> source topic do some transformation and send to destination topic. 
> Application started fine and processed messages till it encountered 
> UnknownHostException, after which application is hung in rebalancing state 
> and not processing messages.
>  
> Below are the properties we have configured :
> application.id = *
> bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
> num.stream.threads=3
> replication.factor=3
> num.standby.replicas=1
> max.block.ms=8640
> acks=all
> auto.offset.reset=earliest
> processing.guarantee=exactly_once
>  
> Additional details.
> Number of brokers - 3
> Source topic partition count - 12 and replication factor of 3
> Destination topic partition count - 12 and replication factor of 3
> 4 instances of stream application are deployed in docker containers.
>  
> Below are the some of the logs :
> {noformat}
> [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
> o.a.k.clients.NetworkClient - [Consumer 
> clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
>  groupId=streams-example] Error connecting to node hostname1:port1 (id: 
> 2147438464 rack: null)
> java.net.UnknownHostException: hostname1 
> at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
> at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
>  
> at 
> org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
>  
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
>  
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  
> at 
>

[jira] [Reopened] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14442:
-

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
> Fix For: 3.2.0
>
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-9250:


> Kafka streams stuck in rebalancing state after UnknownHostException
> ---
>
> Key: KAFKA-9250
> URL: https://issues.apache.org/jira/browse/KAFKA-9250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, network, streams
>Affects Versions: 2.2.0
>Reporter: Vijay
>Priority: Critical
>
> We are using kafka streams (2.2.0) application for reading messages from 
> source topic do some transformation and send to destination topic. 
> Application started fine and processed messages till it encountered 
> UnknownHostException, after which application is hung in rebalancing state 
> and not processing messages.
>  
> Below are the properties we have configured :
> application.id = *
> bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
> num.stream.threads=3
> replication.factor=3
> num.standby.replicas=1
> max.block.ms=8640
> acks=all
> auto.offset.reset=earliest
> processing.guarantee=exactly_once
>  
> Additional details.
> Number of brokers - 3
> Source topic partition count - 12 and replication factor of 3
> Destination topic partition count - 12 and replication factor of 3
> 4 instances of stream application are deployed in docker containers.
>  
> Below are the some of the logs :
> {noformat}
> [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
> o.a.k.clients.NetworkClient - [Consumer 
> clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
>  groupId=streams-example] Error connecting to node hostname1:port1 (id: 
> 2147438464 rack: null)
> java.net.UnknownHostException: hostname1 
> at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
> at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
>  
> at 
> org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
>  
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
>  
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  
> at 
> org.apache.kafka

[jira] [Resolved] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9250.

Resolution: Fixed

> Kafka streams stuck in rebalancing state after UnknownHostException
> ---
>
> Key: KAFKA-9250
> URL: https://issues.apache.org/jira/browse/KAFKA-9250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, network, streams
>Affects Versions: 2.2.0
>Reporter: Vijay
>Priority: Critical
>
> We are using kafka streams (2.2.0) application for reading messages from 
> source topic do some transformation and send to destination topic. 
> Application started fine and processed messages till it encountered 
> UnknownHostException, after which application is hung in rebalancing state 
> and not processing messages.
>  
> Below are the properties we have configured :
> application.id = *
> bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
> num.stream.threads=3
> replication.factor=3
> num.standby.replicas=1
> max.block.ms=8640
> acks=all
> auto.offset.reset=earliest
> processing.guarantee=exactly_once
>  
> Additional details.
> Number of brokers - 3
> Source topic partition count - 12 and replication factor of 3
> Destination topic partition count - 12 and replication factor of 3
> 4 instances of stream application are deployed in docker containers.
>  
> Below are the some of the logs :
> {noformat}
> [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
> o.a.k.clients.NetworkClient - [Consumer 
> clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
>  groupId=streams-example] Error connecting to node hostname1:port1 (id: 
> 2147438464 rack: null)
> java.net.UnknownHostException: hostname1 
> at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
> at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
>  
> at 
> org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
>  
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
>  
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  
> at 
>

[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693312#comment-17693312
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

Talking to [~guozhang] it seem we don't need a KIP :) – I removed the label.

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14747:

Labels: beginner newbie  (was: beginner needs-kip newbie)

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693060#comment-17693060
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

Sure. Not sure if we need a KIP or not. [~guozhang] WDYT?

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14747:
---

Assignee: Koma Zhang

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14748) Relax non-null FK left-join requirement

2023-02-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14748:
---

 Summary: Relax non-null FK left-join requirement
 Key: KAFKA-14748
 URL: https://issues.apache.org/jira/browse/KAFKA-14748
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14748) Relax non-null FK left-join requirement

2023-02-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14748:
---

 Summary: Relax non-null FK left-join requirement
 Key: KAFKA-14748
 URL: https://issues.apache.org/jira/browse/KAFKA-14748
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14747:

Description: 
FK-joins are subject to a race condition: If the left-hand side record is 
updated, a subscription is sent to the right-hand side (including a hash value 
of the left-hand side record), and the right-hand side might send back join 
responses (also including the original hash). The left-hand side only processed 
the responses if the returned hash matches to current hash of the left-hand 
side record, because a different hash implies that the lef- hand side record 
was updated in the mean time (including sending a new subscription to the right 
hand side), and thus the data is stale and the response should not be processed 
(joining the response to the new record could lead to incorrect results).

A similar thing can happen on a right-hand side update that triggers a 
response, that might be dropped if the left-hand side record was updated in 
parallel.

While the behavior is correct, we don't record if this happens. We should 
consider to record this using the existing "dropped record" sensor or maybe add 
a new sensor.

  was:
FK-join are subject to a race condition. If the left-hand side record is 
updated, a subscription is sent to the right-hand side (including a hash value 
of the left-hand side record), and the right-hand side might send back join 
responses (also including the original hash). The left hand side only processed 
the responses if the hash matches, because a different hash implies that the 
left hand side row was updated in the mean-time (including sending a new 
subscription to the right hand side), and thus the data is stale and the 
response should not be processed.

A similar thing can happen on a right hand side update that triggers a 
response, that might be dropped if the left hand side row was updated in 
parallel.

While the behavior is correct, we don't record if this happens. We should 
consider to record this using the existing "dropped record" sensor or maybe add 
a new sensor.


> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14747:
---

 Summary: FK join should record discarded subscription responses
 Key: KAFKA-14747
 URL: https://issues.apache.org/jira/browse/KAFKA-14747
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


FK-join are subject to a race condition. If the left-hand side record is 
updated, a subscription is sent to the right-hand side (including a hash value 
of the left-hand side record), and the right-hand side might send back join 
responses (also including the original hash). The left hand side only processed 
the responses if the hash matches, because a different hash implies that the 
left hand side row was updated in the mean-time (including sending a new 
subscription to the right hand side), and thus the data is stale and the 
response should not be processed.

A similar thing can happen on a right hand side update that triggers a 
response, that might be dropped if the left hand side row was updated in 
parallel.

While the behavior is correct, we don't record if this happens. We should 
consider to record this using the existing "dropped record" sensor or maybe add 
a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14747:
---

 Summary: FK join should record discarded subscription responses
 Key: KAFKA-14747
 URL: https://issues.apache.org/jira/browse/KAFKA-14747
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


FK-join are subject to a race condition. If the left-hand side record is 
updated, a subscription is sent to the right-hand side (including a hash value 
of the left-hand side record), and the right-hand side might send back join 
responses (also including the original hash). The left hand side only processed 
the responses if the hash matches, because a different hash implies that the 
left hand side row was updated in the mean-time (including sending a new 
subscription to the right hand side), and thus the data is stale and the 
response should not be processed.

A similar thing can happen on a right hand side update that triggers a 
response, that might be dropped if the left hand side row was updated in 
parallel.

While the behavior is correct, we don't record if this happens. We should 
consider to record this using the existing "dropped record" sensor or maybe add 
a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14128) Kafka Streams terminates on topic check

2023-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14128.
-
Fix Version/s: 3.5.0
   3.4.1
   Resolution: Fixed

> Kafka Streams terminates on topic check
> ---
>
> Key: KAFKA-14128
> URL: https://issues.apache.org/jira/browse/KAFKA-14128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
> Environment: Any
>Reporter: Patrik Kleindl
>Assignee: Lucia Cerchie
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> Our streams application shut down unexpectedly after some network issues that 
> should have been easily recoverable.
> Logs:
>  
> {code:java}
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting 
> from node 3 due to request timeout.
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
> in-flight METADATA request with correlation id 985 due to node 3 being 
> disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
> 60023ms, request timeout: 3ms)
> 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
> o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
> error during topic description for 
> L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
> Error message was: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
> nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
> 2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread     : stream-thread 
> [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
> I think the relevant code is in 
> [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
> {code:java}
> topicFuture.getValue().get();{code}
> without a timeout value cannot throw a TimeoutException, so the 
> TimeoutException of the AdminClient will be an ExecutionException and hit the 
> last else branch where the StreamsException is thrown.
> Possible fix:
> Use the KafkaFuture method with timeout:
> {code:java}
> public abstract T get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException,
> TimeoutException;{code}
> instead of 
> {code:java}
> public abstract T get() throws InterruptedException, ExecutionException;{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14128) Kafka Streams terminates on topic check

2023-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14128.
-
Fix Version/s: 3.5.0
   3.4.1
   Resolution: Fixed

> Kafka Streams terminates on topic check
> ---
>
> Key: KAFKA-14128
> URL: https://issues.apache.org/jira/browse/KAFKA-14128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
> Environment: Any
>Reporter: Patrik Kleindl
>Assignee: Lucia Cerchie
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> Our streams application shut down unexpectedly after some network issues that 
> should have been easily recoverable.
> Logs:
>  
> {code:java}
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting 
> from node 3 due to request timeout.
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
> in-flight METADATA request with correlation id 985 due to node 3 being 
> disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
> 60023ms, request timeout: 3ms)
> 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
> o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
> error during topic description for 
> L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
> Error message was: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
> nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
> 2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread     : stream-thread 
> [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
> I think the relevant code is in 
> [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
> {code:java}
> topicFuture.getValue().get();{code}
> without a timeout value cannot throw a TimeoutException, so the 
> TimeoutException of the AdminClient will be an ExecutionException and hit the 
> last else branch where the StreamsException is thrown.
> Possible fix:
> Use the KafkaFuture method with timeout:
> {code:java}
> public abstract T get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException,
> TimeoutException;{code}
> instead of 
> {code:java}
> public abstract T get() throws InterruptedException, ExecutionException;{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-904 Kafka Streams - Guarantee subtractor is called before adder if key has not changed

2023-02-22 Thread Matthias J. Sax

Thanks for the KIP. Overall LGTM.

I think you can start a VOTE.


-Matthias

On 2/22/23 5:56 PM, Fq Public wrote:

Just wanted to bump this thread for visbility.
Thanks to everyone who has participated in the discussion so far.

Thanks,
Farooq

On 2023/02/14 19:32:53 Guozhang Wang wrote:

Thanks Farooq, that looks good to me.

Guozhang

On Sun, Feb 12, 2023 at 9:01 AM Dharin Shah  wrote:


Hello Farooq,

This is actually a great idea, we have dealt with this by using an array
instead of a set.
+1 to this :)

Thank you,
Dharin

On Sun, Feb 12, 2023 at 8:11 PM Fq Public  wrote:


Hi Guozhang,

Thanks for reading over my proposal!


Regarding the format, I'm just thinking if we can change the type of

"INT newDataLength" to UINT32?

Good idea, I've updated the KIP to reflect UINT32 since it makes clear the
value can never be less than zero.


`.equals` default implementation on Object is by reference, so if the

groupBy did not generate a new object, that may still pass. This means that
even if user does not implement the `.equals` function, if the same object
is returned then this feature would still be triggered, is that correct?

Correct, I've updated the KIP to call out this edge-case clearly as
follows:


Since the default `.equals` implementation for an `Object`  is by

reference, if a user's `groupBy` returns the same reference for the key,
then the oldKey and the newKey will naturally `.equals`  each other. This
will result in a single event being sent to the repartition topic. This
change in behaviour should be considered a "bug-fix" rather than a
"breaking change" as the semantics of the operation remain unchanged, the
only thing that changes for users is they no longer see transient
"inconsistent" states.  In the worst case, users in this situation will
need to update any strict tests that check specifically for the presence of
transient "inconsistent" states.

What do you think?

Thanks,
Farooq

On 2023/02/07 18:02:24 Guozhang Wang wrote:

Hello Farooq,

Thanks for the very detailed proposal! I think this is a great idea.
Just a few thoughts:

1. I regret that we over-optimized the Changed serde format for
footprint while making it less extensible. It seems to me that a two
rolling bounce migration is unavoidable.. Regarding the format, I'm
just thinking if we can change the type of "INT newDataLength" to
UINT32?

2. `.equals` default implementation on Object is by reference, so if
the groupBy did not generate a new object, that may still pass. This
means that even if user does not implement the `.equals` function, if
the same object is returned then this feature would still be
triggered, is that correct?


Guozhang

On Mon, Feb 6, 2023 at 5:05 AM Fq Public  wrote:


Hi everyone,

I'd like to share a new KIP for discussion:
https://cwiki.apache.org/confluence/x/P5VbDg

This could be considered mostly as a "bug fix" but we wanted to raise

a KIP

for discussion because it involves changes to the serialization format

of

an internal topic which raises backward compatibility considerations.

Please take a look and let me know what you think.

Thanks,
Farooq




Re: [DISCUSS] KIP-907: Add Boolean Serde to public interface

2023-02-22 Thread Matthias J. Sax

Thanks for the KIP.

Overall LGMT.

One comment: Both `BooleanSerializer` and `BooleanDeserializer` are also 
new classes that are added and should be listed explicitly similar to 
`BooleanSerde` in the "Public Interfaces" section of the KIP.



-Matthias

On 2/21/23 10:52 AM, SpacRocket wrote:

Hello Everyone,

I’d like to get a discussion going for KIP-907:
KIP-907: Add Boolean Serde to public interface - Apache Kafka - Apache 
Software Foundation 

cwiki.apache.org 

	favicon.ico 




Which adds Boolean Serde to the public interface.

The KIP contains the details how I want to do this and what internal 
code I need to change.


Looking forward to the group’s feedback! :)

Kind regards
Jakub


[jira] [Assigned] (KAFKA-12549) Allow state stores to opt-in transactional support

2023-02-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-12549:
---

Assignee: (was: Alex Sorokoumov)

> Allow state stores to opt-in transactional support
> --
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13024) Kafka Streams is dropping messages with null key during repartition

2023-02-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13024:
---

Assignee: (was: Alex Sorokoumov)

> Kafka Streams is dropping messages with null key during repartition
> ---
>
> Key: KAFKA-13024
> URL: https://issues.apache.org/jira/browse/KAFKA-13024
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Damien Gasparina
>Priority: Major
>
> {{KStream.repartition}} is silently filtering messages with null keys. A 
> single topology like {{.stream().repartition().to()}} would filter all 
> messages with null key.
> The cause: we are adding a filter before the source & sink nodes 
> ([https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1060-L1064]).
>  It looks like we are doing that because this method is also used for 
> building KTable.
> Null key messages are valid for a KStream, it looks like a regression, the 
> previous {{.through()}} was not filtering null key messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts

2023-02-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9803:
--

Assignee: (was: Boyang Chen)

> Allow producers to recover gracefully from transaction timeouts
> ---
>
> Key: KAFKA-9803
> URL: https://issues.apache.org/jira/browse/KAFKA-9803
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> Transaction timeouts are detected by the transaction coordinator. When the 
> coordinator detects a timeout, it bumps the producer epoch and aborts the 
> transaction. The epoch bump is necessary in order to prevent the current 
> producer from being able to begin writing to a new transaction which was not 
> started through the coordinator.  
> Transactions may also be aborted if a new producer with the same 
> `transactional.id` starts up. Similarly this results in an epoch bump. 
> Currently the coordinator does not distinguish these two cases. Both will end 
> up as a `ProducerFencedException`, which means the producer needs to shut 
> itself down. 
> We can improve this with the new APIs from KIP-360. When the coordinator 
> times out a transaction, it can remember that fact and allow the existing 
> producer to claim the bumped epoch and continue. Roughly the logic would work 
> like this:
> 1. When a transaction times out, set lastProducerEpoch to the current epoch 
> and do the normal bump.
> 2. Any transactional requests from the old epoch result in a new 
> TRANSACTION_TIMED_OUT error code, which is propagated to the application.
> 3. The producer recovers by sending InitProducerId with the current epoch. 
> The coordinator returns the bumped epoch.
> One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH 
> from Produce requests. Partition leaders will not generally know if a bumped 
> epoch was the result of a timed out transaction or a fenced producer. 
> Possibly the producer can treat these errors as abortable when they come from 
> Produce responses. In that case, the user would try to abort the transaction 
> and then we can see if it was due to a timeout or otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14530) Check state updater more than once in process loops

2023-02-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14530.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Check state updater more than once in process loops
> ---
>
> Key: KAFKA-14530
> URL: https://issues.apache.org/jira/browse/KAFKA-14530
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Minor
> Fix For: 3.5.0
>
>
> In the new state restoration code, the state updater needs to be checked 
> regularly by the main thread to transfer ownership of tasks back to the main 
> thread once the state of the task is restored. The more often we check this, 
> the faster we can start processing the tasks.
> Currently, we only check the state updater once in every loop iteration of 
> the state updater. And while we couldn't observe this to be strictly not 
> often enough, we can increase the number of checks easily by moving the check 
> inside the inner processing loop. This would mean that once we have iterated 
> over `numIterations` records, we can already start processing tasks that have 
> finished restoration in the meantime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14530) Check state updater more than once in process loops

2023-02-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14530.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Check state updater more than once in process loops
> ---
>
> Key: KAFKA-14530
> URL: https://issues.apache.org/jira/browse/KAFKA-14530
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Minor
> Fix For: 3.5.0
>
>
> In the new state restoration code, the state updater needs to be checked 
> regularly by the main thread to transfer ownership of tasks back to the main 
> thread once the state of the task is restored. The more often we check this, 
> the faster we can start processing the tasks.
> Currently, we only check the state updater once in every loop iteration of 
> the state updater. And while we couldn't observe this to be strictly not 
> often enough, we can increase the number of checks easily by moving the check 
> inside the inner processing loop. This would mean that once we have iterated 
> over `numIterations` records, we can already start processing tasks that have 
> finished restoration in the meantime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14442) GlobalKTable restoration waits requestTimeout during application restart

2023-02-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690563#comment-17690563
 ] 

Matthias J. Sax commented on KAFKA-14442:
-

Just learned about https://issues.apache.org/jira/browse/KAFKA-12980 – we 
should verify it it would actually fix this issue, similar to 
https://issues.apache.org/jira/browse/KAFKA-14713 

> GlobalKTable restoration waits requestTimeout during application restart
> 
>
> Key: KAFKA-14442
> URL: https://issues.apache.org/jira/browse/KAFKA-14442
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Gergo L��p
>Priority: Major
>
> Using "exactly_once_beta" the highWatermark "skips" an offset after a 
> transaction but in this case the global .checkpoint file contains different 
> value (smaller by 1) than the highWatermark.
> During restoration because of the difference between the checkpoint and 
> highWatermark a poll will be attempted but sometimes there is no new record 
> on the partition and the GlobalStreamThread has to wait for the 
> requestTimeout to continue.
> If there is any new record on the partition the problem does not occure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14713.
-
Resolution: Fixed

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14713.
-
Resolution: Fixed

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690560#comment-17690560
 ] 

Matthias J. Sax commented on KAFKA-14713:
-

Ah. Thanks. That makes sense. Did not look into the consumer code, only 
streams. So it's fixed via https://issues.apache.org/jira/browse/KAFKA-12980 in 
3.2.0 – updated the ticket accordingly. Thanks for getting back. It bugged my 
that I did not understand it :) 

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14713:
-

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14713:
-

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14713:

Fix Version/s: 3.2.0
   (was: 3.4.0)

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.2.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690055#comment-17690055
 ] 

Matthias J. Sax edited comment on KAFKA-14713 at 2/16/23 11:52 PM:
---

Thanks for getting back. Glad it's resolved. I am not sure why though. 
Comparing 3.4 and 3.0 code, it seems they do the same thing.

In the end, if you have valid checkpoint on restart, you should not even hit 
`poll(pollMsPlusRequestTimeout)` during restore, because it should hold that 
`offset == highWatermark` and we should not enter the while-loop...

For K14442, we know that `offset == highWatermark - 1` (because we write the 
"incorrect" watermark into the checkpoint file), and thus `poll()` is executed 
and hangs because there is no data – the last "record" is just a commit marker.


was (Author: mjsax):
Thanks for getting back. Glad it's resolved. I am not sure why though. 
Comparing 3.4 and 3.0 code, it seems they do the same thing.

In the end, if you have valid checkpoint on restart, you should not even hit 
`poll(pollMsPlusRequestTimeout)` during restore, because it should hold that 
`offset == highWatermark` and we should not enter the while-loop...

For K14442, we know that `offset == highWatermark - 1` (because we write the 
"incorrect" watermark into the checkpoint file), and thus you thus `poll()` is 
executed and hang because there is no data – the last "record" is just a commit 
marker.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.4.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690055#comment-17690055
 ] 

Matthias J. Sax commented on KAFKA-14713:
-

Thanks for getting back. Glad it's resolved. I am not sure why though. 
Comparing 3.4 and 3.0 code, it seems they do the same thing.

In the end, if you have valid checkpoint on restart, you should not even hit 
`poll(pollMsPlusRequestTimeout)` during restore, because it should hold that 
`offset == highWatermark` and we should not enter the while-loop...

For K14442, we know that `offset == highWatermark - 1` (because we write the 
"incorrect" watermark into the checkpoint file), and thus you thus `poll()` is 
executed and hang because there is no data – the last "record" is just a commit 
marker.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.4.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14722) Make BooleanSerde public

2023-02-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14722:

Description: We introduce a "BooleanSerde" via 
[https://github.com/apache/kafka/pull/13249] as internal class. We could make 
it public.  (was: We introduce a "BooleanSerde via 
[https://github.com/apache/kafka/pull/13249] as internal class. We could make 
it public.)

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14722) Make BooleanSerde public

2023-02-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689401#comment-17689401
 ] 

Matthias J. Sax commented on KAFKA-14722:
-

Thanks for you interest. We will need a KIP for this change (cf 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)]
 – it should not be hard to write the KIP and get it approved.

Let us know if you have any questions.

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> We introduce a "BooleanSerde via [https://github.com/apache/kafka/pull/13249] 
> as internal class. We could make it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689356#comment-17689356
 ] 

Matthias J. Sax edited comment on KAFKA-14713 at 2/15/23 9:44 PM:
--

What version are you using? – Also, can you point me to the code where it 
actually waits/hangs (as you did already looked into it, it would be quicker 
this way). – I am not sure yet, if both issues are actually the same though or 
not. (Maybe the "eos" config on the other ticket is a red herring.) But I guess 
we can dig into it a little bit.


was (Author: mjsax):
What version are you using? – Also, can you point me to the code where is 
actually waits/hangs (as you did already looked into it, it would be quicker 
this way). – I am not sure yet, if the issue is still the same though or not. 
But I guess we can dig into it a little bit.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689356#comment-17689356
 ] 

Matthias J. Sax commented on KAFKA-14713:
-

What version are you using? – Also, can you point me to the code where is 
actually waits/hangs (as you did already looked into it, it would be quicker 
this way). – I am not sure yet, if the issue is still the same though or not. 
But I guess we can dig into it a little bit.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14722) Make BooleanSerde public

2023-02-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14722:
---

 Summary: Make BooleanSerde public
 Key: KAFKA-14722
 URL: https://issues.apache.org/jira/browse/KAFKA-14722
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We introduce a "BooleanSerde via [https://github.com/apache/kafka/pull/13249] 
as internal class. We could make it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14722) Make BooleanSerde public

2023-02-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14722:
---

 Summary: Make BooleanSerde public
 Key: KAFKA-14722
 URL: https://issues.apache.org/jira/browse/KAFKA-14722
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We introduce a "BooleanSerde via [https://github.com/apache/kafka/pull/13249] 
as internal class. We could make it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14717) KafkaStreams can' get running if the rebalance happens before StreamThread gets shutdown completely

2023-02-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14717:

Component/s: streams

> KafkaStreams can' get running if the rebalance happens before StreamThread 
> gets shutdown completely
> ---
>
> Key: KAFKA-14717
> URL: https://issues.apache.org/jira/browse/KAFKA-14717
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.5.0
>
>
> I noticed this issue when tracing KAFKA-7109
> StreamThread closes the consumer before changing state to DEAD. If the 
> partition rebalance happens quickly, the other StreamThreads can't change 
> KafkaStream state from REBALANCING to RUNNING since there is a 
> PENDING_SHUTDOWN StreamThread



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17688679#comment-17688679
 ] 

Matthias J. Sax commented on KAFKA-14713:
-

Sounds like a duplicate to https://issues.apache.org/jira/browse/KAFKA-14442 ? 
Can we close this ticket?

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-889 Versioned State Stores

2023-02-14 Thread Matthias J. Sax

Thanks Victoria. Makes sense to me.


On 2/13/23 5:55 PM, Victoria Xia wrote:

Hi everyone,

I have just pushed two minor amendments to KIP-889:

- Updated the versioned store specification to clarify that the *"history
retention" parameter is also used as "grace period,"* which means that
writes (including inserts, updates, and deletes) to the store will not be
accepted if the associated timestamp is older than the store's grace period
(i.e., history retention) relative to the current observed stream time.
   - Additional context: previously, the KIP was not explicit about
   if/when old writes would no longer be accepted. The reason for
enforcing a
   strict grace period after which writes will no longer be accepted is
   because otherwise tombstones must be retained indefinitely -- if
the latest
   value for a key is a very old tombstone, we would not be able to
expire it
   from the store because if there’s an even older non-null put to the store
   later, then without the tombstone the store would accept this
write as the
   latest value for the key, even though it isn't. In the spirit of
not adding
   more to this KIP which has already been accepted, I do not propose to add
   additional interfaces to allow users to configure grace period separately
   from history retention at this time. Such options can be introduced in a
   future KIP in a backwards-compatible way.
- Added a *new method to TopologyTestDriver* for getting a versioned
store: getVersionedKeyValueStore().
   - This new method is analogous to existing methods for other types of
   stores, and its previous omission from the KIP was an oversight.

If there are no concerns / objections, then perhaps these updates are minor
enough that we can proceed without re-voting.

Happy to discuss,
Victoria

On Wed, Dec 21, 2022 at 8:22 AM Victoria Xia 
wrote:


Hi everyone,

We have 3 binding and 1 non-binding vote in favor of this KIP (and no
objections) so KIP-889 is now accepted.

Thanks for voting, and for your excellent comments in the KIP discussion
thread!

Happy holidays,
Victoria

On Tue, Dec 20, 2022 at 12:24 PM Sagar  wrote:


Hi Victoria,

+1 (non-binding).

Thanks!
Sagar.

On Tue, Dec 20, 2022 at 1:39 PM Bruno Cadonna  wrote:


Hi Victoria,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 19.12.22 20:03, Matthias J. Sax wrote:

+1 (binding)

On 12/15/22 1:27 PM, John Roesler wrote:

Thanks for the thorough KIP, Victoria!

I'm +1 (binding)

-John

On 2022/12/15 19:56:21 Victoria Xia wrote:

Hi all,

I'd like to start a vote on KIP-889 for introducing versioned

key-value

state stores to Kafka Streams:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores


The discussion thread has been open for a few weeks now and has
converged
among the current participants.

Thanks,
Victoria











[jira] [Updated] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14660:

Fix Version/s: 3.4.1

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0, 3.4.1
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684958#comment-17684958
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Correct. 3.4.0 is already voted and should be released soon. I plan to 
cherry-pick for 3.4.1 after 3.4.0 is out.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683951#comment-17683951
 ] 

Matthias J. Sax edited comment on KAFKA-14660 at 2/3/23 4:52 PM:
-

Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

Thanks for your follow up. Can we close this ticket? Let me know if there is 
anything else I can do.


was (Author: mjsax):
Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683951#comment-17683951
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-890: Transactions Server Side Defense

2023-02-02 Thread Matthias J. Sax

Thanks for the KIP!

+1 (binding)


On 2/2/23 4:18 PM, Artem Livshits wrote:

(non-binding) +1.  Looking forward to the implementation and fixing the
issues that we've got.

-Artem

On Mon, Jan 23, 2023 at 2:25 PM Guozhang Wang 
wrote:


Thanks Justine, I have no further comments on the KIP. +1.

On Tue, Jan 17, 2023 at 10:34 AM Jason Gustafson
 wrote:


+1. Thanks Justine!

-Jason

On Tue, Jan 10, 2023 at 3:46 PM Colt McNealy 

wrote:



(non-binding) +1. Thank you for the KIP, Justine! I've read it; it

makes

sense to me and I am excited for the implementation.

Colt McNealy
*Founder, LittleHorse.io*


On Tue, Jan 10, 2023 at 10:46 AM Justine Olshan
 wrote:


Hi everyone,

I would like to start a vote on KIP-890 which aims to prevent some

of the

common causes of hanging transactions and make other general

improvements

to transactions in Kafka.






https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense


Please take a look if you haven't already and vote!

Justine









Re: Coralogix Logo on Powered By Page

2023-02-01 Thread Matthias J. Sax

Thanks for reaching out.

Can you open a PR against https://github.com/apache/kafka-site updating 
`powered-by.html`?



-Matthias

On 2/1/23 1:13 AM, Tali Soroker wrote:

Hi,
I am writing on behalf of Coralogix to request adding us to the Powered 
By page on the Apache Kafka website.


I am attaching our logo and here is a description of our usage for your 
consideration:


Coralogix uses Kafka Streams to power our observability platform
that ingests and analyzes up to tens of billions of messages per
day. Using Kafka Streams, we are able to decouple analytics from
indexing and storage to provide our users with the best performance,
scalability, and cost.


Best,
Tali


--





  Tali Soroker

Product Marketing

+972 058-681-1707

coralogix.com 



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682648#comment-17682648
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

The original PR did not make sense, as if totalCapacity would really be zero, 
there is a bug and just setting it to 1 does not sound right. I did already 
merge a new PR that just raises an exception for this case, and thus avoid 
divide-by-zero. This should resolve the issue.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks

2023-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14650:
---

Assignee: Guozhang Wang

> IQv2 can throw ConcurrentModificationException when accessing Tasks 
> 
>
> Key: KAFKA-14650
> URL: https://issues.apache.org/jira/browse/KAFKA-14650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
>
> From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, 
> log=true, supplier=IN_MEMORY_WINDOW, 
> kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]*
> java.util.ConcurrentModificationException
>   at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>   at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>   at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>   at java.base/java.util.HashMap.putMapEntries(HashMap.java:508)
>   at java.base/java.util.HashMap.putAll(HashMap.java:781)
>   at 
> org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278)
>   at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168)
>   at 
> org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438)
>   at 
> org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682401#comment-17682401
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

I was thinking about this issue, and I think the only way to upgrade is to 
"drain" your topology. Ie, you would need to stop your upstream producers and 
not send any new input data. Afterwards, you let KS finish processing of all 
input data (including processing of all data from internal topics, ie, 
repartition, fk-subscription, and fk-response topics), to really "drain" the 
topology completely. Next, do a two round rolling bounce using `upgrade.from`, 
and finally resume your upstream producers.

Would you be willing to try this out (and report back)?

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682291#comment-17682291
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Hey Andy – thanks for the ticket. We would have accepted a PR, too. :D

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14660:
---

Assignee: Matthias J. Sax

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>    Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14646.
-
Fix Version/s: 3.4.0
   3.3.3
   Resolution: Fixed

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13769) KTable FK join can miss records if an upstream non-key-changing operation changes key serializer

2023-01-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680858#comment-17680858
 ] 

Matthias J. Sax commented on KAFKA-13769:
-

I just updated fixed version from 3.0.0 to 3.3.3 and 3.4.0. Cf 
https://issues.apache.org/jira/browse/KAFKA-14646 for details.

> KTable FK join can miss records if an upstream non-key-changing operation 
> changes key serializer
> 
>
> Key: KAFKA-13769
> URL: https://issues.apache.org/jira/browse/KAFKA-13769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Consider a topology, where the source KTable is followed by a 
> {{transformValues}} operation [that changes the key 
> schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452]
>  followed by a foreign key join. The FK join might miss records in such a 
> topology because they might be sent to the wrong partitions.
> As {{transformValues}} does not change the key itself, repartition won't 
> happen after this operation. However, the KTable instance that calls 
> {{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}} 
> rather than the original. As a result, all nodes in the FK join topology 
> except for 
> [SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232]
>  use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the 
> old one because it uses 
> [valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225]
>  that in turn will retrieve the records from the topic.
> A different serializer might serialize keys to different series of bytes, 
> which will lead to sending them to the wrong partitions. To run into that 
> issue, multiple things must happen:
> * a topic should have more than one partition,
> * KTable's serializer should be modified via a non-key-changing operation,
> * the new serializer should serialize keys differently
> In practice, it might happen if the key type is a {{Struct}} because it 
> serializes to a JSON string {{columnName -> value}}. If the 
> {{transformValues}} operation changes column names to avoid name clashes with 
> the joining table, such join can lead to incorrect behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13769) KTable FK join can miss records if an upstream non-key-changing operation changes key serializer

2023-01-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13769:

Fix Version/s: 3.4.0
   3.3.3
   (was: 3.3.0)

> KTable FK join can miss records if an upstream non-key-changing operation 
> changes key serializer
> 
>
> Key: KAFKA-13769
> URL: https://issues.apache.org/jira/browse/KAFKA-13769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Consider a topology, where the source KTable is followed by a 
> {{transformValues}} operation [that changes the key 
> schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452]
>  followed by a foreign key join. The FK join might miss records in such a 
> topology because they might be sent to the wrong partitions.
> As {{transformValues}} does not change the key itself, repartition won't 
> happen after this operation. However, the KTable instance that calls 
> {{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}} 
> rather than the original. As a result, all nodes in the FK join topology 
> except for 
> [SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232]
>  use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the 
> old one because it uses 
> [valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225]
>  that in turn will retrieve the records from the topic.
> A different serializer might serialize keys to different series of bytes, 
> which will lead to sending them to the wrong partitions. To run into that 
> issue, multiple things must happen:
> * a topic should have more than one partition,
> * KTable's serializer should be modified via a non-key-changing operation,
> * the new serializer should serialize keys differently
> In practice, it might happen if the key type is a {{Struct}} because it 
> serializes to a JSON string {{columnName -> value}}. If the 
> {{transformValues}} operation changes column names to avoid name clashes with 
> the joining table, such join can lead to incorrect behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14646:

Affects Version/s: 3.3.1
   3.3.0

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14646.
-
Fix Version/s: 3.4.0
   3.3.3
   Resolution: Fixed

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680857#comment-17680857
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

Ok. Talked to Alex and he quickly figured it out – it's quite embarrassing... 
The last two PRs for https://issues.apache.org/jira/browse/KAFKA-13769 did not 
land in 3.3 release branch... (seems we forgot to cherry-pick after merging to 
`trunk`) So we indeed have a bug in 3.3.2 (and 3.3.1 and 3.3.0), and it's only 
fixed in 3.4.0... Bottom line: K13769 does not really fix the issue in 3.3.2 or 
earlier, but only in 3.4.0 which should be available soon.

I just cherry-picked both commits to 3.3 branch, so in case there will be a 
3.3.3 release, it would get picked up.

Thanks for reporting this! Will close this ticket now. Feel free to follow up 
in the comments.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2023-01-25 Thread Matthias J. Sax

Thanks!

+1 (binding)

-Matthias

On 1/24/23 1:17 PM, Guozhang Wang wrote:

Hi Matthias:

re "paused" -> "suspended": I got your point now, thanks. Just to
clarify the two functions are a bit different: "paused" tasks are
because of the topology being paused, i.e. from KIP-834; whereas
"suspended" tasks are when a restoring tasks are being removed before
it completes due to a follow-up rebalance, and this is to distinguish
with "onRestoreEnd", as described in KAFKA-10575. A suspended task is
no longer owned by the thread and hence there's no need to measure the
number of such tasks.

re: "restore-ratio": that's a good point. I like it to function in the
same way as the "records-rate" metrics. Will update the wiki.

re: making "restore-remaining-records-total" at INFO level: sounds
good to me too. I will also update the metric name a bit to be more
specific.



On Thu, Jan 19, 2023 at 2:35 PM Guozhang Wang
 wrote:


Hello Matthias,

Thanks for the feedback. I was on vacation for a while. Pardon for the
late replies. Please see them inline below

On Thu, Dec 1, 2022 at 11:23 PM Matthias J. Sax  wrote:


Seems I am late to the party... Great KIP. Couple of questions from my side:

(1) What is the purpose of `standby-updating-tasks`? It seems to be the
same as the number of assigned standby task? Not sure how useful it
would be?


In general, yes, it is the number of assigned standby tasks --- there
will be transit times when the assigned standby tasks are not yet
being updated but it would not last long --- but we do not yet have a
direct gauge to expose this before, and users have to infer this from
other indirect metrics.




(2) `active-paused-tasks` / `standby-paused-tasks` -- what does "paused"
exactly mean? There was a discussion about renaming the callback method
from pause to suspended. So should this be called `suspended`, too? And
if yes, how is it useful for users?


Pausing here refers to "KIP-834: Pause / Resume KafkaStreams
Topologies" 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832).
When a topology is paused, all its tasks including standbys will be
paused too.

I'm not aware of a discussion to rename the call name to "suspend" for
KIP-834. Could you point me to the reference?




(3) `restore-ratio`: the description says


The fraction of time the thread spent on restoring active or standby tasks


I find the term "restoring" does only apply to active tasks, but not to
standbys. Can we reword this?


Yeah I have been discussing this with others in the community a bit as
well, but so far I have not been convinced of a better name than it.
Some other alternatives being discussed but not win everyone's love is
"restore-or-update-ratio", "process-ratio" (for the restore thread
that means restoring or updating), and "io-ratio".

The only one so far that I feel is probably better, is
"state-update-ratio". If folks feel this one is better than
"restore-ratio" I'm happy to update.



(4) `restore-call-rate`: not sure what you exactly mean by "restore calls"?


This is similar to the "io-calls-rate" in the selector classes, i.e.
the number of "restore" function calls made. It's argurably a very
low-level metrics but I included it since it could be useful in some
debugging scenarios.



(5) `restore-remaining-records-total` -- why is this a task metric?
Seems we could roll it up into a thread metric that we report at INFO
level (we could still have per-task DEBUG level metric for it in addition).


The rationale behind it is the general principle in metrics design
that "Kafka would provide the lowest necessary metrics levels, and
users can do the roll-ups however they want".



(6) What about "warmup tasks"? Internally, we treat them as standbys,
but it seems it's hard for users to reason about it in the scale-out
warm-up case. Would it be helpful (and possible) to report "warmup
progress" explicitly?


At the restore thread level, we cannot differentiate standby tasks
from warmup tasks since the latter is created exactly just like the
former. But I do agree this is an issue for visibility that worth
addressing, I think another KIP would be needed to first consider
distinguishing these two at the class level.



-Matthias


On 11/1/22 2:44 AM, Lucas Brutschy wrote:

We need this!

+ 1 non binding

Cheers,
Lucas

On Tue, Nov 1, 2022 at 10:01 AM Bruno Cadonna  wrote:


Guozhang,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 25.10.22 22:07, Walker Carlson wrote:

+1 non binding

Thanks for the kip!

On Thu, Oct 20, 2022 at 10:25 PM John Roesler  wrote:


Thanks for the KIP, Guozhang!

I'm +1 (binding)

-John

On Wed, Oct 12, 2022, at 16:36, Nick Telford wrote:

Can't wait!
+1 (non-binding)

On Wed, 12 Oct 2022, 18:02 Guozhang Wang, 
wr

Re: [DISCUSS] KIP-890 Server Side Defense

2023-01-25 Thread Matthias J. Sax

would it build an offset map with just the latest timestamp for a key?


Cannot remember the details without reading the KIP, but yes, something 
like this (I believe it actually needs to track both, offset and 
timestamp per key).



I wonder if ordering assumptions are baked in there, why not use offset-based 
compaction.


The use case is a compacted topic that does contain out-of-order data. 
If you set key k1=v1 @ 5 offset 100 and later key1 = v0 @ 3 at offset 
200 we want to cleanup v0 with higher offset because it's out-of-order 
based on time, but keep v1 what is the actual latest version of k1.




I was also not aware of this "guarantee" with regards to broker side time.


As already said: I am not sure if it's a public contract, but based on 
my experience, people might reply on it as "implicit contract". -- Maybe 
somebody else knows if it's public or not, and if it would be ok to 
"break" it.



Let me know if you have any concerns here.


My understanding is: While we cannot make an offset-order guarantee for 
interleaved writes of different producer, if the topic is configures 
with "append_time", we "guarantee" (cf. my comment above") timestamp 
order... If that's the case, it would be an issue if we break this 
"guarantee".


I am not sure when the broker sets the timestamp for "append_time" 
config? If we do it before putting the request into purgatory, we have a 
problem. However, if we set the timestamp when we actually process the 
request and do the actual append, it seems there is no issue, as the 
request that was waiting in purgatory get the "newest" timestamp and 
thus cannot introduce out-of-order data.



-Matthias


On 1/24/23 10:44 AM, Justine Olshan wrote:

Hey Matthias,

I have actually never heard of KIP-280 so thanks for bringing it up. That
seems interesting. I wonder how it would work though -- would it build an
offset map with just the latest timestamp for a key? I wonder if ordering
assumptions are baked in there, why not use offset-based compaction.

I was also not aware of this "guarantee" with regards to broker side time.
I think that we can do in order handling for a given producer, but not
across all producers. However, we can't guarantee that anyway.

Let me know if you have any concerns here.

Thanks,
Justine

On Mon, Jan 23, 2023 at 6:33 PM Matthias J. Sax  wrote:


Just a side note about Guozhang comments about timestamps.

If the producer sets the timestamp, putting the record into purgatory
seems not to be an issue (as already said: for this case we don't
guarantee timestamp order between writes of different producers anyway).
However, if the broker sets the timestamp, the expectation is that there
is no out-of-order data in the partition ever; if we would introduce
out-of-order data for this case (for interleaved writes of different
producers), it seems we would violate the current contract? (To be fair:
I don't know if that's an official contract, but I assume people rely on
this behavior -- and it "advertised" in many public talks...)

About compaction: there is actually KIP-280 that adds timestamp based
compaction what is a very useful feature for Kafka Streams with regard
to out-of-order data handling. So the impact if we introduce
out-of-order data could be larger scoped.


-Matthias


On 1/20/23 4:48 PM, Justine Olshan wrote:

Hey Artem,

I see there is a check for transactional producers. I'm wondering if we
don't handle the epoch overflow case. I'm also not sure it will be a huge
issue to extend to transactional producers, but maybe I'm missing

something.


As for the recovery path -- I think Guozhang's point was if we have a bad
client that repeatedly tries to produce without adding to the transaction
we would do the following:
a) if not fatal, we just fail the produce request over and over
b) if fatal, we fence the producer

Here with B, the issue with the client would be made clear more quickly.

I

suppose there are some intermediate cases where the issue only occurs
sometimes, but I wonder if we should consider how to recover with clients
who don't behave as expected anyway.

I think there is a place for the abortable error that we are adding --

just

abort and try again. But I think there are also some cases where trying

to

recover overcomplicates some logic. Especially if we are considering

older

clients -- there I'm not sure if there's a ton we can do besides fail the
batch or fence the producer. With newer clients, we can consider more
options for what can just be recovered after aborting. But epochs might

be

a hard one unless we also want to reset producer ID.

Thanks,
Justine



On Fri, Jan 20, 2023 at 3:59 PM Artem Livshits
 wrote:


   besides the poorly written client case


A poorly written client could create a lot of grief to people who run

Kafka

brokers :-), so when deciding to make an error fatal I wou

[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680840#comment-17680840
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

Could be – let me sync with Alex who worked on the ticket I linked above – need 
to think about it a little bit.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2023-01-23 Thread Matthias J. Sax

Thanks Guozhang. Couple of clarifications and follow up questions.



I'm not aware of a discussion to rename the call name to "suspend" for
KIP-834. Could you point me to the reference?


My commend was not about KIP-834, but about this KIP. You originally 
proposed to call the new call-back `onRestorePause` but to avoid 
confusion it was improved and renamed to `onRestoreSuspended`.




The only one so far that I feel is probably better, is
"state-update-ratio". If folks feel this one is better than
"restore-ratio" I'm happy to update.


Could we actually report two metric, one for the restore phase 
(restore-ration), and one for steady state ([standby-]update-ratio)?


I could like with `state-update-ratio` if we want to have a single 
metric for both, but splitting them sound useful to me.




(4) `restore-call-rate`


Maybe we can clarify in the description a little bit. I agree it's very 
low level but if you think it could be useful to debugging, I have no 
objection.




The rationale behind it is the general principle in metrics design
that "Kafka would provide the lowest necessary metrics levels, and
users can do the roll-ups however they want".


That's fair, but it seems to be a rather important metric, and having it 
only at DEBUG level seems not ideal? Could we make it INFO level, even 
if it's a task level (ie, apply an exception to the rule).




-Matthias



On 1/19/23 2:35 PM, Guozhang Wang wrote:

Hello Matthias,

Thanks for the feedback. I was on vacation for a while. Pardon for the
late replies. Please see them inline below

On Thu, Dec 1, 2022 at 11:23 PM Matthias J. Sax  wrote:


Seems I am late to the party... Great KIP. Couple of questions from my side:

(1) What is the purpose of `standby-updating-tasks`? It seems to be the
same as the number of assigned standby task? Not sure how useful it
would be?


In general, yes, it is the number of assigned standby tasks --- there
will be transit times when the assigned standby tasks are not yet
being updated but it would not last long --- but we do not yet have a
direct gauge to expose this before, and users have to infer this from
other indirect metrics.




(2) `active-paused-tasks` / `standby-paused-tasks` -- what does "paused"
exactly mean? There was a discussion about renaming the callback method
from pause to suspended. So should this be called `suspended`, too? And
if yes, how is it useful for users?


Pausing here refers to "KIP-834: Pause / Resume KafkaStreams
Topologies" 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832).
When a topology is paused, all its tasks including standbys will be
paused too.

I'm not aware of a discussion to rename the call name to "suspend" for
KIP-834. Could you point me to the reference?




(3) `restore-ratio`: the description says


The fraction of time the thread spent on restoring active or standby tasks


I find the term "restoring" does only apply to active tasks, but not to
standbys. Can we reword this?


Yeah I have been discussing this with others in the community a bit as
well, but so far I have not been convinced of a better name than it.
Some other alternatives being discussed but not win everyone's love is
"restore-or-update-ratio", "process-ratio" (for the restore thread
that means restoring or updating), and "io-ratio".

The only one so far that I feel is probably better, is
"state-update-ratio". If folks feel this one is better than
"restore-ratio" I'm happy to update.



(4) `restore-call-rate`: not sure what you exactly mean by "restore calls"?


This is similar to the "io-calls-rate" in the selector classes, i.e.
the number of "restore" function calls made. It's argurably a very
low-level metrics but I included it since it could be useful in some
debugging scenarios.



(5) `restore-remaining-records-total` -- why is this a task metric?
Seems we could roll it up into a thread metric that we report at INFO
level (we could still have per-task DEBUG level metric for it in addition).


The rationale behind it is the general principle in metrics design
that "Kafka would provide the lowest necessary metrics levels, and
users can do the roll-ups however they want".



(6) What about "warmup tasks"? Internally, we treat them as standbys,
but it seems it's hard for users to reason about it in the scale-out
warm-up case. Would it be helpful (and possible) to report "warmup
progress" explicitly?


At the restore thread level, we cannot differentiate standby tasks
from warmup tasks since the latter is created exactly just like the
former. But I do agree this is an issue for visibility that worth
addressing, I think another KIP would be needed to first consider
distinguishing these two at the class level.



-Matthias


On 11/1/22 2:44 AM, Lucas Brutschy wrote:

We need this!

+

Re: [DISCUSS] KIP-890 Server Side Defense

2023-01-23 Thread Matthias J. Sax
rkerRequests with

the

bumped

epoch."

Hmm,

the

epoch

is

associated

with the current txn right?

So,

it

seems

weird to

write a

commit

message

with a bumped epoch. Should we

only

bump

up

the

epoch

in

EndTxnResponse

and

rename the field to sth like

nextProducerEpoch?


Thanks,

Jun



On Mon, Dec 12, 2022 at 8:54

PM

Matthias

J.

Sax <

mj...@apache.org>

wrote:



Thanks for the background.

20/30: SGTM. My proposal was

only

focusing

to

avoid

dangling

transactions if records are

added

without

registered

partition.

--

Maybe

you can add a few more

details

to

the

KIP

about

this

scenario

for

better

documentation purpose?

40: I think you hit a fair

point

about

race

conditions

or

client

bugs

(incorrectly not bumping the

epoch). The

complexity/confusion

for

using

the bumped epoch I see, is

mainly

for

internal

debugging,

ie,

inspecting

log segment dumps -- it

seems

harder to

reason

about

the

system

for

us

humans. But if we get better

guarantees, it

would

be

worth to

use

the

bumped epoch.

60: as I mentioned already,

I

don't

know the

broker

internals

to

provide

more input. So if nobody

else

chimes

in, we

should

just

move

forward

with your proposal.


-Matthias


On 12/6/22 4:22 PM, Justine

Olshan

wrote:

Hi all,
After Artem's questions

about

error

behavior,

I've

re-evaluated

the

unknown producer ID

exception

and

had

some

discussions

offline.


I think generally it makes

sense

to

simplify

error

handling

in

cases

like

this and the

UNKNOWN_PRODUCER_ID

error

has a

pretty

long

and

complicated

history. Because of this,

I

propose

adding a

new

error

code

ABORTABLE_ERROR

that when encountered by

new

clients

(gated

by

the

produce

request

version)

will simply abort the

transaction.

This

allows

the

server

to

have

some

say

in whether the client

aborts

and

makes

handling

much

simpler.

In

the

future, we can also use

this

error in

other

situations

where

we

want

to

abort the transactions. We

can

even

use on

other

apis.


I've added this to the

KIP.

Let

me

know if

there

are

any

questions

or

issues.

Justine

On Fri, Dec 2, 2022 at

10:22

AM

Justine

Olshan

<

jols...@confluent.io



wrote:



Hey Matthias,


20/30 — Maybe I also

didn't

express

myself

clearly.

For

older

clients

we

don't have a way to

distinguish

between a

previous

and

the

current

transaction since we

don't

have

the

epoch

bump.

This

means

that

a

late

message from the previous

transaction

may be

added to

the

new

one.

With

older clients — we can't

guarantee

this

won't

happen

if we

already

sent

the

addPartitionsToTxn call

(why

we

make

changes

for

the

newer

client)

but

we

can at least gate some by

ensuring

that

the

partition

has

been

added

to

the

transaction. The

rationale

here

is

that

there

are

likely

LESS

late

arrivals

as time goes on, so

hopefully

most

late

arrivals

will

come

in

BEFORE

the

addPartitionsToTxn call.

Those

that

arrive

before

will

be

properly

gated

with the

describeTransactions

approach.


If we take the approach

you

suggested,

ANY

late

arrival

from a

previous

transaction will be

added.

And

we

don't

want

that. I

also

don't

see

any

benefit in sending

addPartitionsToTxn

over

the

describeTxns

call.

They

will

both be one extra RPC to

the

Txn

coordinator.



To be clear — newer

clients

will

use

addPartitionsToTxn

instead

of

the

DescribeTxns.


40)
My concern is that if we

have

some

delay

in

the

client

to

bump

the

epoch,

it could continue to send

epoch

73

and

those

records

would

not

be

fenced.

Perhaps this is not an

issue

if

we

don't

allow

the

next

produce

to

go

through before the EndTxn

request

returns.

I'm

also

thinking

about

cases of

failure. I will need to

think

on

this a

bit.


I wasn't sure if it was

that

confusing.

But

if

we

think it

is,

we

can

investigate other ways.


60)

I'm not sure these are

the

same

purgatories

since

one

is a

produce

purgatory (I was planning

on

using a

callback

rather

than

purgatory)

and

the other is simply a

request

to

append

to

the

log.

Not

sure

we

have

any

structure here for

ordering,

but

my

understanding

is

that

the

broker

could

handle the write request

before

it

hears

back

from

the

Txn

Coordinator.


Let me know if I

misunderstood

something

or

something

was

unclear.


Justine

On Thu, Dec 1, 2022 at

12:15

PM

Matthias

J.

Sax

<

mj...@apache.org



wrote:



Thanks for the details

Justine!



20)

The client side change

for

2

is

removing

the

addPartitions

to

transaction

call. We don't need to

make

this

from

the

producer

to

the

txn

coordinator,

only server side.


I think I did not

express

myself

clearly. I

understand

that

we

can

(and

should) change t

Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Matthias J. Sax

Thanks.

I agree. Seems your options are limited. The API is not really a good 
fix for what you want to do... Sorry.


-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:

Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is 
stored to a changelog as well. The time series data is bucketed (5 
minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2 
time buckets in the store at once. As we receive new timeseries data, we 
figure out what time bucket it belongs to, and add it to its respective 
bucket. We have a “grace period” which allows for late arriving data to 
be processed even after a time bucket has ended. That’s the reason why 
we have this constraint of 2 time buckets max within the store; 1 for 
the previous bucket in its grace period, 1 for the current bucket.


So we wanted to extend the base state store and add a simple in-memory 
map to track the 2 time buckets per timeseries (that’s the store key). A 
couple reasons why we don’t want to add this as a separate state store 
or the existing store are:
1. There is a ton of serialization / deserialization that happens behind 
the scenes


2. This new time bucket tracking map would only be updated a couple 
times per time bucket, and does not need to be updated on every message 
read.


3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store 
functionality, create a “new state store” that really just instantiates 
one of the included stores within, add this in memory map, and then plug 
into/alter/extend the restore functionality to populate the time bucket 
tracking map during restore time.


It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a 
method to scan the whole store and build up the time bucket map before 
starting to process.


Any advice on Kafka streams / state store logic would be appreciated!

-Upesh

Upesh Desai​	 | 	Senior Software Developer	 | 	*ude...@itrsgroup.com* 
<mailto:ude...@itrsgroup.com>


*www.itrsgroup.com* <https://www.itrsgroup.com/>  


<https://www.itrsgroup.com/>



*From: *Matthias J. Sax 
*Date: *Wednesday, January 18, 2023 at 12:50 AM
*To: *users@kafka.apache.org 
*Subject: *Re: Custom Kafka Streams State Restore Logic

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
      root,
      (RecordBatchingStateRestoreCallback) records -> {
      for (final ConsumerRecord record : records) {
      put(
      Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
      record.value(),
/extractStoreTimestamp/(record.key())
      );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

      record,
      consistencyEnabled,
      position
      );
      }
      }
);

Thanks in advance!

Upesh

<https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

   
Upesh Desai​

Senior Software Developer

*ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com 
<mailto:ude...@itrsgroup.com>>
*www.itrsgroup.com* <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]





[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680042#comment-17680042
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

Thanks for following up – glad to hear that it's in the docs... And I hope it 
resolved the problem.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 20+ topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679991#comment-17679991
 ] 

Matthias J. Sax edited comment on KAFKA-14646 at 1/23/23 7:21 PM:
--

Did you upgrade with two rolling bounced leveraging `upgrad_from` config?

I assume is related to https://issues.apache.org/jira/browse/KAFKA-13769

Of course, K13769 could have introduced some bug, but we actually to test 
rolling upgrades and would hope it would have caught it (otherwise, we need to 
improve our testing...)

We unfortunately, lag some docs on the web-page about the required two rolling 
bounce upgrade path – it unfortunately slipped. We have it in the backlog to 
add the missing docs asap.


was (Author: mjsax):
Did you upgrade with two rolling bounced leveraging `upgrad_from` config?

I assume is related to https://issues.apache.org/jira/browse/KAFKA-13769

We unfortunately, lag some docs on the web-page about the required two rolling 
bounce upgrade path – it unfortunately slipped. We have it in the backlog to 
add the missing docs asap.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 20+ topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679991#comment-17679991
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

Did you upgrade with two rolling bounced leveraging `upgrad_from` config?

I assume is related to https://issues.apache.org/jira/browse/KAFKA-13769

We unfortunately, lag some docs on the web-page about the required two rolling 
bounce upgrade path – it unfortunately slipped. We have it in the backlog to 
add the missing docs asap.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 20+ topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14638) Documentation for transaction.timeout.ms should be more precise

2023-01-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14638.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Documentation for transaction.timeout.ms should be more precise
> ---
>
> Key: KAFKA-14638
> URL: https://issues.apache.org/jira/browse/KAFKA-14638
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Minor
> Fix For: 3.5.0
>
>
> The documentation for {{transaction.timeout.ms}} is
> {quote}
> The maximum amount of time in ms that the transaction coordinator will wait 
> for a transaction status update from the producer before proactively aborting 
> the ongoing transaction. If this value is larger than the 
> transaction.max.timeout.ms setting in the broker, the request will fail with 
> a InvalidTxnTimeoutException error.
> {quote}
> It would be easier to reason about this timeout if the documentation would 
> elaborate on when the timer starts ticking and under what circumstances it 
> might reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14638) Documentation for transaction.timeout.ms should be more precise

2023-01-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14638.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Documentation for transaction.timeout.ms should be more precise
> ---
>
> Key: KAFKA-14638
> URL: https://issues.apache.org/jira/browse/KAFKA-14638
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Minor
> Fix For: 3.5.0
>
>
> The documentation for {{transaction.timeout.ms}} is
> {quote}
> The maximum amount of time in ms that the transaction coordinator will wait 
> for a transaction status update from the producer before proactively aborting 
> the ongoing transaction. If this value is larger than the 
> transaction.max.timeout.ms setting in the broker, the request will fail with 
> a InvalidTxnTimeoutException error.
> {quote}
> It would be easier to reason about this timeout if the documentation would 
> elaborate on when the timer starts ticking and under what circumstances it 
> might reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    5   6   7   8   9   10   11   12   13   14   >