[GitHub] [kafka] kamalcph commented on a diff in pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
kamalcph commented on code in PR #14151: URL: https://github.com/apache/kafka/pull/14151#discussion_r1284973824 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_METADATA_PREFIX = "remote.log.metadata"; Review Comment: Should we append the `PROP` (or) `CONFIG` suffix to it? ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_METADATA_PREFIX = "remote.log.metadata"; Review Comment: Should we have append the `_PROP` (or) `_CONFIG` suffix to it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12890) Consumer group stuck in `CompletingRebalance`
[ https://issues.apache.org/jira/browse/KAFKA-12890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751269#comment-17751269 ] Yang commented on KAFKA-12890: -- Hi [~dajac] I know this issue and PR have been closed for a while, just wonder if you have any broker-side log for this issue that you can share. Thanks! > Consumer group stuck in `CompletingRebalance` > - > > Key: KAFKA-12890 > URL: https://issues.apache.org/jira/browse/KAFKA-12890 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 2.8.1, 3.0.0 > > > We have seen recently multiple consumer groups stuck in > `CompletingRebalance`. It appears that those group never receives the > assignment from the leader of the group and remains stuck in this state > forever. > When a group transitions to the `CompletingRebalance` state, the group > coordinator sets up `DelayedHeartbeat` for each member of the group. It does > so to ensure that the member sends a sync request within the session timeout. > If it does not, the group coordinator rebalances the group. Note that here, > `DelayedHeartbeat` is used here for this purpose. `DelayedHeartbeat` are also > completed when member heartbeats. > The issue is that https://github.com/apache/kafka/pull/8834 has changed the > heartbeat logic to allow members to heartbeat while the group is in the > `CompletingRebalance` state. This was not allowed before. Now, if a member > starts to heartbeat while the group is in the `CompletingRebalance`, the > heartbeat request will basically complete the pending `DelayedHeartbeat` that > was setup previously for catching not receiving the sync request. Therefore, > if the sync request never comes, the group coordinator does not notice > anymore. > We need to bring that behavior back somehow. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15309: Description: The producer collects multiple records into batches, and a single record specific error might fail the whole batch (eg, `RecordTooLargeException`). This ticket suggests to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. Another example for which a production exception handler could be useful, if a user tries to write into a non-existing topic, which returns a retryable error code; with infinite retries the producer would hang retrying forever. A handler could help to break the infinite retry loop. was: The producer collects multiple records into batches, and a single record specific error might fail the whole batch (eg, `RecordTooLargeException`). This ticket suggests to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. Another example for which a production exception handler would be useful, if a user tries to write into a non-existing topic, which returns a retryable error code; with infinite retries the producer would hang retrying forever. > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler could be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. A > handler could help to break the infinite retry loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15309: Description: The producer collects multiple records into batches, and a single record specific error might fail the whole batch (eg, `RecordTooLargeException`). This ticket suggests to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. Another example for which a production exception handler would be useful, if a user tries to write into a non-existing topic, which returns a retryable error code; with infinite retries the producer would hang retrying forever. was: The producer collects multiple records into batches, and a single record specific error might fail the whole batch (eg, `RecordTooLargeException`). This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler would be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15309: Description: The producer collects multiple records into batches, and a single record specific error might fail the whole batch (eg, `RecordTooLargeException`). This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. was: The producer batches up multiple records into batches, and a single record specific error might fail the whole batch. This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggest to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15309: Description: The producer batches up multiple records into batches, and a single record specific error might fail the whole batch. This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`). The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. was: The producer batches up multiple records into batches, and a single record specific error might fail the whole batch. This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`. > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The producer batches up multiple records into batches, and a single record > specific error might fail the whole batch. > This ticket suggest to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267 ] Matthias J. Sax commented on KAFKA-15259: - I did sync up with [~cegerton] who worked on https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this idea: adding a "production exception handler" to the producer that would allow KS to tell the producer to not fail the TX but skip the record: https://issues.apache.org/jira/browse/KAFKA-15259 If we cannot do K15259, an alternative might be, to add an internal producer config that allow Kafka Streams to disable the pro-active abort of a TX. This would be safe, because Kafka Streams is actually a good citizen and calls `producer.flush()` and evaluates all callbacks before trying to commit – the issue K9279 addresses is actually bad user behavior to not check for async errors before committing. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes
[jira] [Created] (KAFKA-15309) Add custom error handler to Producer
Matthias J. Sax created KAFKA-15309: --- Summary: Add custom error handler to Producer Key: KAFKA-15309 URL: https://issues.apache.org/jira/browse/KAFKA-15309 Project: Kafka Issue Type: New Feature Components: producer Reporter: Matthias J. Sax The producer batches up multiple records into batches, and a single record specific error might fail the whole batch. This ticket suggest to add a per-record error handler, that allows user to opt into skipping bad records without failing the whole batch (similar to Kafka Streams `ProductionExceptionHandler`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon merged PR #14133: URL: https://github.com/apache/kafka/pull/14133 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon commented on PR #14133: URL: https://github.com/apache/kafka/pull/14133#issuecomment-1666390423 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testOneWord() Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault() Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
showuon commented on PR #14151: URL: https://github.com/apache/kafka/pull/14151#issuecomment-1666390279 > Yeah, okay, this pull request makes sense to me. To summarise, you are saying that configurations starting with `remote.log.metadata` need to be passed to implementations of the RemoteLogMetadataManager as part of the rlmmProps, however, they are never put in the rlmmProps by the RemoteLogManagerConfig? > > Test failures appear to be unrelated. Correct! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751263#comment-17751263 ] Matthias J. Sax commented on KAFKA-15259: - Thanks for digging into it – that's a good fine – I was already wondering, because I could not see any code changes between 3.1 and 3.2 in Kafka Streams that would explain it. And yes, if the producer goes into error-state, it is impossible for KS to "revert" it – thus, I am not sure right now how we could fix it... In the end, Kafka Streams does `producer.flush()` and evaluates if there are any errors, detect the `RecordTooLargeException`, executed the handler which returns `CONTINUE` what is respected. If it would not be respected, the `RecordTooLargeException` would be re-thrown right away. But because Kafka Streams does `CONTINUE` it actually tries to commit, but cannot because the producer is already in error state. I high level idea would be, trying to remember the input record offset, and after we failed and the task is restarted, has an implicit filter that drops the input message right away based on the offset we did remember. But such a thing would needs to get very careful design... > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is
[GitHub] [kafka] mjsax merged pull request #14105: MINOR: improve logging for FK-join
mjsax merged PR #14105: URL: https://github.com/apache/kafka/pull/14105 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
[ https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-15202: --- Assignee: Greg Harris > MM2 OffsetSyncStore clears too many syncs when sync spacing is variable > --- > > Key: KAFKA-15202 > URL: https://issues.apache.org/jira/browse/KAFKA-15202 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > The spacing between OffsetSyncs can vary significantly, due to conditions in > the upstream topic and in the replication rate of the MirrorSourceTask. > The OffsetSyncStore attempts to keep a maximal number of distinct syncs > present, and for regularly spaced syncs it does not allow an incoming sync to > expire more than one other unique sync. There are tests to enforce this > property. > For variable spaced syncs, there is no such guarantee, because multiple > fine-grained syncs may need to be expired at the same time. However, instead > of only those fine-grained syncs being expired, the store may also expire > coarser-grained syncs. This causes a large decrease in the number of unique > syncs. > This is an extremely simple example: Syncs: 0 (start), 1, 2, 4. > The result: > {noformat} > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, > downstreamOffset=1} applied, new state is [1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, > downstreamOffset=2} applied, new state is [2:2,1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, > downstreamOffset=4} applied, new state is [4:4,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat} > Instead of being expired, the 2:2 sync should still be present in the final > state, allowing the store to maintain 3 unique syncs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 opened a new pull request, #14156: URL: https://github.com/apache/kafka/pull/14156 The new OffsetSyncStore historical translation cache clears more syncs than necessary when the gap between syncs is variable. This is a problem with the replacement promotion logic, which only covered the base case when promoting an one index to the immediately following index. This has the effect that if a sync fails to satisfy an invariant for the following index, then it is discarded immediately, even if the value would satisfy the invariants at a different index. In particular, invariant B which enforces a maximum distance between two syncs gets more permissive as the index in the array increases, so a sync which does not satisfy invariant B at index 1 may satisfy it at index j > 1. Instead of the greedy discarding algorithm, the replacement promotion logic should keep a separate index into the potential replacements from the original array, and delay discarding a sync until it can be determined that the sync is not valid at any place in the array. In particular, syncs are certainly not worth keeping if they are duplicates of other syncs, or if they fail invariant C. Invariant C becomes more strict as the index in the array increases, so a sync which does not satisfy invariant C at index 1 will certainly never satisfy it at index j > 1. In order to verify the changes, new tests which use Random to generate gaps between syncs, and generalize the test for maintaining unique syncs to an arbitrary stream of gaps. The different tests cover: 1. Constant spacing 2. Uniform random spacing between 0 and N 3. Uniform random spacing between M and N with M > 0 4. Bimodal spacing at maxOffsetLag and 2x maxOffsetLag The new algorithm is an extension of the existing one, so all of the consistent-spacing tests have the exact same behavior. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS
[ https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15308: Affects Version/s: (was: 3.4.0) (was: 3.5.0) > Wipe Stores upon OffsetOutOfRangeException in ALOS > -- > > Key: KAFKA-15308 > URL: https://issues.apache.org/jira/browse/KAFKA-15308 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Colt McNealy >Priority: Minor > > As per this [Confluent Community Slack > Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ], > Streams currently does not wipe away RocksDB state upon encountering an > `OffsetOutOfRangeException` in ALOS. > > `OffsetOutOfRangeException` is a rare case that occurs when a standby task > requests offsets that no longer exist in the topic. We should wipe the store > for three reasons: > # Not wiping the store can be a violation of ALOS since some of the > now-missing offsets could have contained tombstone records. > # Wiping the store has no performance cost since we need to replay the > entirety of what's in the changelog topic anyways. > # I have heard (not yet confirmed myself) that we wipe the store in EOS > anyways, so fixing this bug could remove a bit of complexity from supporting > EOS and ALOS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS
Colt McNealy created KAFKA-15308: Summary: Wipe Stores upon OffsetOutOfRangeException in ALOS Key: KAFKA-15308 URL: https://issues.apache.org/jira/browse/KAFKA-15308 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.5.0, 3.4.0, 3.3.0 Reporter: Colt McNealy As per this [Confluent Community Slack Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ], Streams currently does not wipe away RocksDB state upon encountering an `OffsetOutOfRangeException` in ALOS. `OffsetOutOfRangeException` is a rare case that occurs when a standby task requests offsets that no longer exist in the topic. We should wipe the store for three reasons: # Not wiping the store can be a violation of ALOS since some of the now-missing offsets could have contained tombstone records. # Wiping the store has no performance cost since we need to replay the entirety of what's in the changelog topic anyways. # I have heard (not yet confirmed myself) that we wipe the store in EOS anyways, so fixing this bug could remove a bit of complexity from supporting EOS and ALOS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate
Matthias J. Sax created KAFKA-15307: --- Summary: Kafka Streams configuration docs outdate Key: KAFKA-15307 URL: https://issues.apache.org/jira/browse/KAFKA-15307 Project: Kafka Issue Type: Task Components: docs, streams Reporter: Matthias J. Sax [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html] need to be updated. It's missing a lot of newly added config, and still lists already removed configs. For deprecated configs, we could consider to also remove them, or add a "deprecated config" section and keep the for the time being. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mehbey commented on pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…
mehbey commented on PR #14135: URL: https://github.com/apache/kafka/pull/14135#issuecomment-1666331731 Updated PR to address comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…
mehbey commented on code in PR #14135: URL: https://github.com/apache/kafka/pull/14135#discussion_r1284932829 ## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ## @@ -642,7 +642,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(KafkaConfig.CompressionTypeProp, "gzip") props.put(KafkaConfig.LogPreAllocateProp, true.toString) props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString) -props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000") Review Comment: I have made three types of changes to teh tests 1) Added parametrized tests to validation on all new configs and the deprecated one. See changes in `PlaintextProducerSendTest.scala` and `ProducerRequestTest.scala` 2) Added test cases for backward compatibility for both `before` and `after` cases 3) Removed unnecessary calls to set `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` to the default value. This calls don't add any value to the test since the calls are not changing the default value. See example changes in `LogCleanerTest.scala` and `LogManagerTest.scala` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284932353 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, Review Comment: Thanks. We will need a docs PR anyway -- can you open one in the next days and include this information? -- Wondering if the `description` in `StreamsConfig` should also mention the different default values for both assignors and explain that `null` means use those defaults? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14155: MINOR: update Kafka Streams state.dir doc
mjsax commented on PR #14155: URL: https://github.com/apache/kafka/pull/14155#issuecomment-1666311777 `kafka-site.git` PR: https://github.com/apache/kafka-site/pull/536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751241#comment-17751241 ] Satish Duggana commented on KAFKA-15267: Option 4.1 is the option suggested in [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes] It does not require a KIP. It is a minor change to add to the existing code. > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires customers to clean any tiered topics before > switching off TS,
[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
lihaosky commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1284914061 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java: ## @@ -57,6 +57,7 @@ public StickyTaskAssignor() { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, + final RackAwareTaskAssignor rackAwareTaskAssignor, Review Comment: Sounds good ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -124,11 +132,20 @@ private static void assignActiveStatefulTasks(final SortedMap ClientState::assignActive, (source, destination) -> true ); + +if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) { +final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? Review Comment: Because the costs are different for stateful and stateless tasks. So setting in constructor means we need to pass in two RackAwareTaskAssignor objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
lihaosky commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284910264 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ## @@ -267,25 +267,46 @@ public static class AssignmentConfigs { public final int numStandbyReplicas; public final long probingRebalanceIntervalMs; public final List rackAwareAssignmentTags; +public final Integer rackAwareAssignmentTrafficCost; Review Comment: It can be null and we will use default in assignor. It's related to your questions in https://github.com/apache/kafka/pull/14139#discussion_r1284910181 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, Review Comment: The default value is different for `HAAssignor` and `StickyAssignor`, so the null here means use default in assignor. If it's set, it would override assignor's default. I guess we can guide to set relative values. Maybe we can mention default in HAAssignor is 10, 1 and in sticky assignor, it will be 1, 10... ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1378,6 +1380,48 @@ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() { assertEquals(0, configs.size()); } +@Test +public void shouldReturnDefaultRackAwareAssignmentConfig() { +final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +assertEquals("NONE", strategy); +} + +@Test +public void shouldtSetMinTrafficRackAwareAssignmentConfig() { +props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); +assertEquals("MIN_TRAFFIC", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); Review Comment: It will use default cost in assignor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751236#comment-17751236 ] Matthias J. Sax commented on KAFKA-15302: - Thanks for reporting this issue. – When you call `store.all()` you get a iterator back that is build over the cache as well as RocksDB. For the underlying RocksDB iterator, it provides an immutable snapshot, thus any later writes into RocksDB are not visible to the iterator. Thus, if the cache is flushed, and we try to read the key from the cache and cannot find it, we go to the underlying RocksDB iterator which cannot see the write. This should explain it. What I am wondering though right now is, why the cache would get flushed to begin with? – There should not be an explicit `store.flush()` call because we only flush before a `commit()` what happens on the same thread; we might also `evict()` during a `put()` if the cache overflows, but there is no `put()` call in between; the third case I could find is, when a new `StreamThread` is added and we need to resize the cache (this would indeed be an concurrent operation; could adding/removing a thread explain what you observe? Otherwise we would need to do more digging while the cache is flushed begin with? If we flush incorrectly and can avoid the flush we should be able to fix it. If we flush correctly, we might need to have a guard inside the caching layer itself and suppress the flush if there is an open iterator (what does actually not sound like a great solution, but maybe it would be the correct way forward.) > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 >
[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
lihaosky commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1284910181 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -43,21 +43,27 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class); +private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; +private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; +private static final int DEFAULT_STATELESS_TRAFFIC_COST = 1; +private static final int DEFAULT_STATELESS_NON_OVERLAP_COST = 1; Review Comment: Yeah. The only stateful cost is configurable. The default is not set in `StreamsConfig` because it will be different for `HAAssignor` and `StickyAssignor`. So the default in `StreamsConfig` is null which means use assignor's default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1284873983 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -701,7 +713,9 @@ private long partitionReady(Cluster cluster, long nowMs, String topic, } waitedTimeMs = batch.waitedTimeMs(nowMs); -backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; +backingOff = !batch.hasLeaderChanged(leader) && +batch.attempts() > 0 && +waitedTimeMs < retryBackoff.backoff(batch.attempts() - 1); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
mjsax commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1284866806 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -43,21 +43,27 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class); +private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; +private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; +private static final int DEFAULT_STATELESS_TRAFFIC_COST = 1; +private static final int DEFAULT_STATELESS_NON_OVERLAP_COST = 1; Review Comment: We have different costs for stataful vs stateless here, but the configs added only allow to pass in one value each for traffic and non-overlap. How does this fix together? From the code below it seem, users only configure cost for stateful anyway, but stateless cost is alway `1` (for this case, the variables should not be called `DEFAULT` because they cannot be changed). Also, if we have default value, should they not be set in `StreamsConfig`? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java: ## @@ -57,6 +57,7 @@ public StickyTaskAssignor() { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, + final RackAwareTaskAssignor rackAwareTaskAssignor, Review Comment: Should we make this an `Optional` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -124,11 +132,20 @@ private static void assignActiveStatefulTasks(final SortedMap ClientState::assignActive, (source, destination) -> true ); + +if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) { +final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? Review Comment: Why do we get `trafficCost` each time? Seems we should set it up in the constructor just a single time (or implement `Configurable` interface)? (Same for `nonOverlapCost`) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java: ## @@ -16,18 +16,29 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.Optional; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; public interface TaskAssignor { /** * @return whether the generated assignment requires a followup probing rebalance to satisfy all conditions */ -boolean assign(Map clients, - Set allTaskIds, - Set statefulTaskIds, - AssignorConfiguration.AssignmentConfigs configs); +boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final Cluster cluster, + final Map> partitionsForTask, + final Map> changelogPartitionsForTask, + final Map> tasksForTopicGroup, + final Map>> racksForProcessConsumer, + final InternalTopicManager internalTopicManager, Review Comment: It's internal. I am fine anyway -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ++ " tasks to minimize cross rack traffic. Valid settings are : " + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC ++ ", which will compute minimum cross rack traffic assignment"; Review Comment: ```suggestion + ", which will compute minimum cross rack traffic assignment."; ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" Review Comment: ```suggestion public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ++ " tasks to minimize cross rack traffic. Valid settings are : " + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC ++ ", which will compute minimum cross rack traffic assignment"; + +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; +public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and rack.aware.assignment.non_overlap_cost controls whether the " ++ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " ++ "optimize minimizing cross rack traffic"; Review Comment: ```suggestion + "optimize for minimizing cross rack traffic."; ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static
[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
[ https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15306: --- Description: Integrate refreshCommittedOffsets logic, currently performed by the coordinator, into the update fetch positions performed on every iteration of the async consumer poll loop. This should rely on the CommitRequestManager to perform the request based on the refactored model, but it should reuse the logic for processing the committed offsets and updating the positions. (was: Integrate refreshCommittedOffsets logic, currently performed by the coordinator, into the update fetch positions performed on every iteration of the consumer poll loop. ) > Integrate committed offsets logic when updating fetching positions > -- > > Key: KAFKA-15306 > URL: https://issues.apache.org/jira/browse/KAFKA-15306 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client, consumer, kip-945 > > Integrate refreshCommittedOffsets logic, currently performed by the > coordinator, into the update fetch positions performed on every iteration of > the async consumer poll loop. This should rely on the CommitRequestManager to > perform the request based on the refactored model, but it should reuse the > logic for processing the committed offsets and updating the positions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
Lianet Magrans created KAFKA-15306: -- Summary: Integrate committed offsets logic when updating fetching positions Key: KAFKA-15306 URL: https://issues.apache.org/jira/browse/KAFKA-15306 Project: Kafka Issue Type: Task Reporter: Lianet Magrans Assignee: Lianet Magrans Integrate refreshCommittedOffsets logic, currently performed by the coordinator, into the update fetch positions performed on every iteration of the consumer poll loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman commented on pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks
ableegoldman commented on PR #14149: URL: https://github.com/apache/kafka/pull/14149#issuecomment-1666170754 Test failures are unrelated Super small fix, would like to get it into 3.6 though ping also @vvcephei @showuon @wcarlson5 for a review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15090) Source tasks are no longer stopped on a separate thread
[ https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751214#comment-17751214 ] Chris Egerton commented on KAFKA-15090: --- This is a tricky one. First off, although we're almost certainly not going to move calls to {{SourceTask::stop}} back onto the herder tick thread, it's not unreasonable to try to restore some of the original behavior by spinning up a separate thread that can be used to call that method (so that we don't block on tasks returning from {{SourceTask::poll}} before being able to stop them). However, this complicates the logic for task cleanup. Currently, tasks can deallocate all of their resources in {{{}SourceTask::stop{}}}, secure in the knowledge that the runtime will never poll them again or notify them of committed offsets. If we go back to potentially invoking {{SourceTask::stop}} and {{SourceTask::poll}} concurrently, then it becomes difficult for tasks to know when exactly they can deallocate resources. This is the motivation behind the now-abandoned [KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped]. One alternative is to keep most of the current behavior, but with stronger semantics for cancelling source tasks. This is described in KAFKA-14725; to summarize: we would continue invoking {{SourceTask::stop}} and {{SourceTask::poll}} on the same thread like we do today, but would start interrupting the poll-convert-produce thread when tasks exceed the graceful shutdown timeout. Advantages of this alternative are that it preserves behavior that has been around in the Connect runtime for the last five releases, and makes it easier for developers to correctly implement resource deallocation logic in their source connectors. Disadvantages are that it does not make graceful task shutdown easier (increasing the likelihood of [ERROR-level log messages|https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1035]), and may not work for all connectors (interrupting a thread in Java is not guaranteed to actually interrupt some operations, even if they are blocking). Finally, we could pursue something identical or similar to [KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped], by modifying the source task API to have separate methods for "triggering" a stop (which signals that the task should immediately return from any future or in-progress calls to {{{}SourceTask::poll{}}}) and "performing" a stop (wherein the task deallocates resources). > Source tasks are no longer stopped on a separate thread > --- > > Key: KAFKA-15090 > URL: https://issues.apache.org/jira/browse/KAFKA-15090 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, > 3.3.3, 3.6.0, 3.5.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the > {{SourceTask::stop}} method would be invoked on the herder tick thread, which > is a separate thread from the dedicated thread which was responsible for > polling data from the task and producing it to Kafka. > This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state: > {quote}The task will be stopped on a separate thread, and when that happens > this method is expected to unblock, quickly finish up any remaining > processing, and return. > {quote} > However, it came with the downside that the herder's tick thread would be > blocked until the invocation of {{SourceTask::stop}} completed, which could > result in major parts of the worker's REST API becoming unavailable and even > the worker falling out of the cluster. > As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the > logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the > dedicated thread for the task (i.e., the one responsible for polling data > from it and producing that data to Kafka). > This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} > and may have broken connectors that block during {{poll}} with the > expectation that {{stop}} can and will be invoked concurrently as a signal > that any ongoing polls should be interrupted immediately. > Although reverting the fix is likely not a viable option (blocking the herder > thread on interactions with user-written plugins is high-risk and we have > tried to eliminate all instances of this where
[jira] [Commented] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751212#comment-17751212 ] Matthias J. Sax commented on KAFKA-15303: - Thanks for filing this ticket. Kafka Streams as only limited support for schema evolution, because Kafka Streams is schema agnostic – the runtime does not even know what format is used, and thus cannot reason about it. I updated the ticket as "improvement" because it's not a bug: the system works as designed. In the end, Kafka Streams uses whatever Serde you provide, so it's not clear if we even could fix it on our end? Maybe you could put some hack into the Serde you provide to fix it? It's unfortunately not possible atm to get the original raw bytes right now (that would allow so avoid the re-serialization to begin with). > Foreign key joins no longer triggered by events on the right side of the join > after deployment with a new compatible Avro schema > > > Key: KAFKA-15303 > URL: https://issues.apache.org/jira/browse/KAFKA-15303 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.4.0 >Reporter: Charles-Eddy >Priority: Major > Attachments: image (1).png > > > Hello everyone, I am currently working on a project that uses Kafka Streams > (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. > Our goal is to join offer information from our sellers with additional data > from various input topics, and then feed the resulting joined information > into an output topic. > Our application is deployed in Kubernetes using the StatefulSet feature, with > one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. > We are using avro to serialize / deserialize input topics and storing in the > state stores of Kafka streams. > We have encountered a bug in Kafka Streams that prevents us from deploying > new versions of Kafka Streams containing new compatible Avro schemas of our > input topics. > The symptom is that after deploying our new version, which contains no > changes in topology but only changes to the Avro schema used, we discard > every event coming from the right part of the join concerned by these Avro > schema changes until we receive something from the left part of the join. > As a result, we are losing events and corrupting our output topics and stores > with outdated information. > After checking the local help for the priority to assign, I have assigned it > as *CRITICAL* because we are losing data (for example, tombstones are not > propagated to the following joins, so some products are still visible on our > website when they should not be). > Please feel free to change the priority if you think it is not appropriate. > > *The bug:* > After multiple hours of investigation we found out that the bug is located in > the foreign key join feature and specifically in this class: > *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key > join. > This class and his method process(...) is computing a hash from the local > store via a serialization of a deserialized value from the left state store > and comparing it with the hash of the original message from the > subscription-response-topic. > > It means that when we deploy a new version of our kafka streams instance with > a new compatible avro schema from the left side of a join, every join > triggered by the right part of the join are invalidated until we receive all > the events again on the left side. Every join triggered by the right part of > the join are discarded because all the hashes computed by kafka streams are > different now from the original messages. > > *How to reproduce it:* > If we take a working and a non-working workflow, it will do something like > this: > +Normal non-breaking+ workflow from the left part of the FK join: > # A new offer event occurs. The offer is received and stored (v1). > # A subscription registration is sent with the offer-hash (v1). > # The subscription is saved to the store with the v1 offer-hash. > # Product data is searched for. > # If product data is found, a subscription response is sent back, including > the v1 offer-hash. > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer
[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15303: Issue Type: Improvement (was: Bug) > Foreign key joins no longer triggered by events on the right side of the join > after deployment with a new compatible Avro schema > > > Key: KAFKA-15303 > URL: https://issues.apache.org/jira/browse/KAFKA-15303 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.4.0 >Reporter: Charles-Eddy >Priority: Critical > Attachments: image (1).png > > > Hello everyone, I am currently working on a project that uses Kafka Streams > (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. > Our goal is to join offer information from our sellers with additional data > from various input topics, and then feed the resulting joined information > into an output topic. > Our application is deployed in Kubernetes using the StatefulSet feature, with > one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. > We are using avro to serialize / deserialize input topics and storing in the > state stores of Kafka streams. > We have encountered a bug in Kafka Streams that prevents us from deploying > new versions of Kafka Streams containing new compatible Avro schemas of our > input topics. > The symptom is that after deploying our new version, which contains no > changes in topology but only changes to the Avro schema used, we discard > every event coming from the right part of the join concerned by these Avro > schema changes until we receive something from the left part of the join. > As a result, we are losing events and corrupting our output topics and stores > with outdated information. > After checking the local help for the priority to assign, I have assigned it > as *CRITICAL* because we are losing data (for example, tombstones are not > propagated to the following joins, so some products are still visible on our > website when they should not be). > Please feel free to change the priority if you think it is not appropriate. > > *The bug:* > After multiple hours of investigation we found out that the bug is located in > the foreign key join feature and specifically in this class: > *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key > join. > This class and his method process(...) is computing a hash from the local > store via a serialization of a deserialized value from the left state store > and comparing it with the hash of the original message from the > subscription-response-topic. > > It means that when we deploy a new version of our kafka streams instance with > a new compatible avro schema from the left side of a join, every join > triggered by the right part of the join are invalidated until we receive all > the events again on the left side. Every join triggered by the right part of > the join are discarded because all the hashes computed by kafka streams are > different now from the original messages. > > *How to reproduce it:* > If we take a working and a non-working workflow, it will do something like > this: > +Normal non-breaking+ workflow from the left part of the FK join: > # A new offer event occurs. The offer is received and stored (v1). > # A subscription registration is sent with the offer-hash (v1). > # The subscription is saved to the store with the v1 offer-hash. > # Product data is searched for. > # If product data is found, a subscription response is sent back, including > the v1 offer-hash. > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > > +Breaking workflow:+ > The offer serializer is changed to offer v2 > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v2) and response event (still v1) are compared. > # The join result is not sent since the hash is
[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15303: Priority: Major (was: Critical) > Foreign key joins no longer triggered by events on the right side of the join > after deployment with a new compatible Avro schema > > > Key: KAFKA-15303 > URL: https://issues.apache.org/jira/browse/KAFKA-15303 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.4.0 >Reporter: Charles-Eddy >Priority: Major > Attachments: image (1).png > > > Hello everyone, I am currently working on a project that uses Kafka Streams > (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. > Our goal is to join offer information from our sellers with additional data > from various input topics, and then feed the resulting joined information > into an output topic. > Our application is deployed in Kubernetes using the StatefulSet feature, with > one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. > We are using avro to serialize / deserialize input topics and storing in the > state stores of Kafka streams. > We have encountered a bug in Kafka Streams that prevents us from deploying > new versions of Kafka Streams containing new compatible Avro schemas of our > input topics. > The symptom is that after deploying our new version, which contains no > changes in topology but only changes to the Avro schema used, we discard > every event coming from the right part of the join concerned by these Avro > schema changes until we receive something from the left part of the join. > As a result, we are losing events and corrupting our output topics and stores > with outdated information. > After checking the local help for the priority to assign, I have assigned it > as *CRITICAL* because we are losing data (for example, tombstones are not > propagated to the following joins, so some products are still visible on our > website when they should not be). > Please feel free to change the priority if you think it is not appropriate. > > *The bug:* > After multiple hours of investigation we found out that the bug is located in > the foreign key join feature and specifically in this class: > *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key > join. > This class and his method process(...) is computing a hash from the local > store via a serialization of a deserialized value from the left state store > and comparing it with the hash of the original message from the > subscription-response-topic. > > It means that when we deploy a new version of our kafka streams instance with > a new compatible avro schema from the left side of a join, every join > triggered by the right part of the join are invalidated until we receive all > the events again on the left side. Every join triggered by the right part of > the join are discarded because all the hashes computed by kafka streams are > different now from the original messages. > > *How to reproduce it:* > If we take a working and a non-working workflow, it will do something like > this: > +Normal non-breaking+ workflow from the left part of the FK join: > # A new offer event occurs. The offer is received and stored (v1). > # A subscription registration is sent with the offer-hash (v1). > # The subscription is saved to the store with the v1 offer-hash. > # Product data is searched for. > # If product data is found, a subscription response is sent back, including > the v1 offer-hash. > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > > +Breaking workflow:+ > The offer serializer is changed to offer v2 > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v2) and response event (still v1) are compared. > # The join result is not sent since the hash is different.
[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
gharris1727 commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1666104751 @fvaleri That does appear to work, because [server-common is not being excluded from tools-dependant-libs](https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/build.gradle#L1895-L1897) like the clients is. I don't think this was intentional, I think it was accidentally left out when server-common was added as a dependency in #12469 . There is a compatibility boundary between server-common and clients in this case, but that was already there before your change. If we choose not to change this compatibility boundary from between server-common + clients to between tools + server-common, then I think the fix you propose is possible. I agree with your reasoning that server-common is a reasonable dependency for the connect-runtime, even if it's only present use is the ThroughputThrottler. I don't really mind whether it's in clients or server-common, as long as it's visible in the dependent modules. If someone later moves the compatibility boundary, and reduces the range of the version mixing (by changing the if condition to `if node.version < LATEST_1_1`) then moving this class around will have no effect anyway, so this fix doesn't make it any harder to add more fixes later. Can you revert the changes in producer_performance.py? they don't seem to have an effect anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15090) Source tasks are no longer stopped on a separate thread
[ https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15090: -- Description: Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the {{SourceTask::stop}} method would be invoked on the herder tick thread, which is a separate thread from the dedicated thread which was responsible for polling data from the task and producing it to Kafka. This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state: {quote}The task will be stopped on a separate thread, and when that happens this method is expected to unblock, quickly finish up any remaining processing, and return. {quote} However, it came with the downside that the herder's tick thread would be blocked until the invocation of {{SourceTask::stop}} completed, which could result in major parts of the worker's REST API becoming unavailable and even the worker falling out of the cluster. As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the dedicated thread for the task (i.e., the one responsible for polling data from it and producing that data to Kafka). This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and may have broken connectors that block during {{poll}} with the expectation that {{stop}} can and will be invoked concurrently as a signal that any ongoing polls should be interrupted immediately. Although reverting the fix is likely not a viable option (blocking the herder thread on interactions with user-written plugins is high-risk and we have tried to eliminate all instances of this where feasible), we may try to restore the expected contract by spinning up a separate thread exclusively for invoking {{SourceTask::stop}} separately from the dedicated thread for the task and the herder's thread. was: Before [https://github.com/apache/kafka/pull/9669,] in distributed mode, the {{SourceTask::stop}} method would be invoked on the herder tick thread, which is a separate thread from the dedicated thread which was responsible for polling data from the task and producing it to Kafka. This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state: {quote}The task will be stopped on a separate thread, and when that happens this method is expected to unblock, quickly finish up any remaining processing, and return. {quote} However, it came with the downside that the herder's tick thread would be blocked until the invocation of {{SourceTask::stop}} completed, which could result in major parts of the worker's REST API becoming unavailable and even the worker falling out of the cluster. As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the dedicated thread for the task (i.e., the one responsible for polling data from it and producing that data to Kafka). This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and may have broken connectors that block during {{poll}} with the expectation that {{stop}} can and will be invoked concurrently as a signal that any ongoing polls should be interrupted immediately. Although reverting the fix is likely not a viable option (blocking the herder thread on interactions with user-written plugins is high-risk and we have tried to eliminate all instances of this where feasible), we may try to restore the expected contract by spinning up a separate thread exclusively for invoking {{SourceTask::stop}} separately from the dedicated thread for the task and the herder's thread. > Source tasks are no longer stopped on a separate thread > --- > > Key: KAFKA-15090 > URL: https://issues.apache.org/jira/browse/KAFKA-15090 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, > 3.3.3, 3.6.0, 3.5.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the > {{SourceTask::stop}} method would be invoked on the herder tick thread, which > is a separate thread from the dedicated thread which was responsible for > polling data from the task and producing it to Kafka. > This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state: > {quote}The task will be stopped on a separate thread, and when that happens > this method is expected to unblock, quickly finish up any remaining > processing, and return. > {quote} > However, it came with the downside that the herder's tick thread
[jira] [Created] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
Philip Nee created KAFKA-15305: -- Summary: The background thread should try to process the remaining task until the shutdown timer is expired Key: KAFKA-15305 URL: https://issues.apache.org/jira/browse/KAFKA-15305 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee While working on https://issues.apache.org/jira/browse/KAFKA-15304 close() API supplies a timeout parameter so that the consumer can have a grace period to process things before shutting down. The background thread currently doesn't do that, when close() is initiated, it will immediately close all of its dependencies. This might not be desirable because there could be remaining tasks to be processed before closing. Maybe the correct things to do is to first stop accepting API request, second, let the runOnce() continue to run before the shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15020: -- Component/s: unit tests > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor > test is flaky > -- > > Key: KAFKA-15020 > URL: https://issues.apache.org/jira/browse/KAFKA-15020 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Atul Sharma >Priority: Major > Labels: flaky-test > > Sometimes the test fails with the following log: > {code:java} > Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > > FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED > org.opentest4j.AssertionFailedError: Consumed 0 records before timeout > instead of the expected 2 records > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) > at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15101) Improve testRackAwareRangeAssignor test
[ https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15101: -- Labels: flaky-test (was: ) > Improve testRackAwareRangeAssignor test > --- > > Key: KAFKA-15101 > URL: https://issues.apache.org/jira/browse/KAFKA-15101 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Priority: Major > Labels: flaky-test > > testRackAwareRangeAssignor has been really flaky recently. As a mitigation, > we have increased the timeouts to 30s in the test. This should already > improve it. However, it may be better to revise the entire test to avoid > those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15101) Improve testRackAwareRangeAssignor test
[ https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15101: -- Component/s: unit tests > Improve testRackAwareRangeAssignor test > --- > > Key: KAFKA-15101 > URL: https://issues.apache.org/jira/browse/KAFKA-15101 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: David Jacot >Priority: Major > Labels: flaky-test > > testRackAwareRangeAssignor has been really flaky recently. As a mitigation, > we have increased the timeouts to 30s in the test. This should already > improve it. However, it may be better to revise the entire test to avoid > those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15101) Improve testRackAwareRangeAssignor test
[ https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751187#comment-17751187 ] Kirk True commented on KAFKA-15101: --- Looks like a duplicate of KAFKA-15020. > Improve testRackAwareRangeAssignor test > --- > > Key: KAFKA-15101 > URL: https://issues.apache.org/jira/browse/KAFKA-15101 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Priority: Major > > testRackAwareRangeAssignor has been really flaky recently. As a mitigation, > we have increased the timeouts to 30s in the test. This should already > improve it. However, it may be better to revise the entire test to avoid > those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing
Philip Nee created KAFKA-15304: -- Summary: CompletableApplicationEvents aren't being completed when the consumer is closing Key: KAFKA-15304 URL: https://issues.apache.org/jira/browse/KAFKA-15304 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee If the background thread is closed before ingesting all ApplicationEvents, we should drain the background queue and try to cancel these events before closing. We can try to process these events before closing down the consumer; however, we assume that when the user issues a close command, the consumer should be shut down promptly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on PR #13562: URL: https://github.com/apache/kafka/pull/13562#issuecomment-1665982738 @fvaleri I summon thee. Please, take one more look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
lianetm commented on code in PR #14123: URL: https://github.com/apache/kafka/pull/14123#discussion_r1284689796 ## core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala: ## @@ -35,8 +39,12 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest { consumer.commitAsync(cb) waitUntilTrue(() => { cb.successCount == 1 -}, "wait until commit is completed successfully", 5000) +}, "wait until commit is completed successfully", defaultBlockingAPITimeoutMs) +val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs)) + assertTrue(consumer.assignment.contains(tp)) +assertNotNull(committedOffset) +assertNull(committedOffset.get(tp)) Review Comment: Since this test is for no-consumption then the `sendRecords` could be removed in both tests I guess -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14682: -- Description: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter instead of the former, which can cause tests to fail due to unnecessary stubbings when being run in that IDE but not when being built on Jenkins. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. This exact behavior has been reported elsewhere as a [Gradle issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on that thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], it appears this is a known and somewhat-intentional limitation of Mockito: {quote}I spent some time trying to solve this and eventually I stumbled upon this piece in Mockito's JUnit runner: [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53] // only report when: // 1. if all tests from given test have ran (filter requested is false) // Otherwise we would report unnecessary stubs even if the user runs just single test // from the class // 2. tests are successful (we don't want to add an extra failure on top of any existing // failure, to avoid confusion) (1) suggests that skipping unused stub validation is the intended behavior when the user filters a single test from the class. However, this behavior applies to any type of filter. And Gradle indeed applies a {{CategoryFilter}} if categories are configured: [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96] Which then causes Mockito to not validate unused stubs. {quote} was: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter instead of the former, which can cause tests to fail due to unnecessary stubbings when being run in that IDE but not when being built on Jenkins. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. This exact behavior has been reported elsewhere as a [Gradle issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on that thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], it appears this is a known and somewhat-intentional limitation of Mockito: {quote}I spent some time trying to solve this and eventually I stumbled upon this piece in Mockito's JUnit runner: [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53] {{ }} {{}} {{ // only report when: // 1. if all tests from given test have ran (filter requested is false) // Otherwise we would report unnecessary stubs even if the user runs just single test // from the class // 2. tests are successful (we
[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751178#comment-17751178 ] Chris Egerton commented on KAFKA-14682: --- I've taken a stab at an upstream Mockito fix. If it's accepted, we can either bump to a new version with that fix, or look into tweaking our Jenkinsfile to run the {{test}} task instead of {{{}unitTest integrationTest{}}}. > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the > latter instead of the former, which can cause tests to fail due to > unnecessary stubbings when being run in that IDE but not when being built on > Jenkins. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. > > This exact behavior has been reported elsewhere as a [Gradle > issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on > that > thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], > it appears this is a known and somewhat-intentional limitation of Mockito: > {quote}I spent some time trying to solve this and eventually I stumbled upon > this piece in Mockito's JUnit runner: > [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53] > {{ }} > {{}} > {{ // only report when: > // 1. if all tests from given test have ran (filter requested is > false) > // Otherwise we would report unnecessary stubs even if the user > runs just single test > // from the class > // 2. tests are successful (we don't want to add an extra failure > on top of any existing > // failure, to avoid confusion)}} > > (1) suggests that skipping unused stub validation is the intended behavior > when the user filters a single test from the class. However, this behavior > applies to any type of filter. > And Gradle indeed applies a {{CategoryFilter}} if categories are configured: > [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96] > Which then causes Mockito to not validate unused stubs. > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-14682: - Assignee: Chris Egerton (was: Christo Lolov) > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the > latter instead of the former, which can cause tests to fail due to > unnecessary stubbings when being run in that IDE but not when being built on > Jenkins. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. > > This exact behavior has been reported elsewhere as a [Gradle > issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on > that > thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], > it appears this is a known and somewhat-intentional limitation of Mockito: > {quote}I spent some time trying to solve this and eventually I stumbled upon > this piece in Mockito's JUnit runner: > [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53] > {{ }} > {{}} > {{ // only report when: > // 1. if all tests from given test have ran (filter requested is > false) > // Otherwise we would report unnecessary stubs even if the user > runs just single test > // from the class > // 2. tests are successful (we don't want to add an extra failure > on top of any existing > // failure, to avoid confusion)}} > > (1) suggests that skipping unused stub validation is the intended behavior > when the user filters a single test from the class. However, this behavior > applies to any type of filter. > And Gradle indeed applies a {{CategoryFilter}} if categories are configured: > [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96] > Which then causes Mockito to not validate unused stubs. > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15298) Disable DeleteRecords on Tiered Storage topics
[ https://issues.apache.org/jira/browse/KAFKA-15298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751172#comment-17751172 ] Christo Lolov commented on KAFKA-15298: --- Heya [~ckamal]! Thanks for pointing me to the pull request. I had not reviewed it so I wasn't aware of this particular change (https://github.com/apache/kafka/pull/13561/files#diff-10e27a71dc3dec3463df7752ab07f6227cf70bcee70a93a86b5984e020beae05L984) which I believe addresses my concern. Give me some time to review the pull request and I will close this Jira ticket if there is no need for it anymore. > Disable DeleteRecords on Tiered Storage topics > -- > > Key: KAFKA-15298 > URL: https://issues.apache.org/jira/browse/KAFKA-15298 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > Currently the DeleteRecords API does not work with Tiered Storage. We should > ensure that this is reflected in the responses that clients get when trying > to use the API with tiered topics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee opened a new pull request, #14154: [TEST] Testing build failures for the commit
philipnee opened a new pull request, #14154: URL: https://github.com/apache/kafka/pull/14154 There's a lot of random build failure, and we aren't sure why. Starting a fresh build to see if these build failures persist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-7438: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
bachmanity1 commented on PR #14153: URL: https://github.com/apache/kafka/pull/14153#issuecomment-1665858277 @C0urante @mjsax @cadonna could you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 opened a new pull request, #14153: KAFKA-7438: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
bachmanity1 opened a new pull request, #14153: URL: https://github.com/apache/kafka/pull/14153 Replaced Easymock & Powermock with Mockito in KafkaBasedLogTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746622#comment-17746622 ] Federico Valeri edited comment on KAFKA-15239 at 8/4/23 3:58 PM: - Hi [~showuon], thanks for raising this. QuotaTest uses ProducerPerformanceService which is where the issue is located. This is the test instance that is failing. {code} [INFO:2023-07-24 05:00:40,057]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 'cls_name': 'QuotaTest', 'method_name': 'test_quota', 'injected_args': {'quota_type': 'client-id', 'old_client_throttling_behavior': True}} ... Exception: No output from ProducerPerformance {code} If we look at ProducerPerformanceService stderr logs in ./results, we have the reported exception. {code} Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThroughputThrottler at org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101) at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThroughputThrottler at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 2 more {code} There are other system tests that use ProducerPerformanceService, and are failing for the same reason. {code} TC_PATHS="tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"quota_type\":\"client-id\",\"old_client_throttling_behavior\":true}'" bash tests/docker/run_tests.sh TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"version\":\"0.8.2.2\",\"new_consumer\":false}'" bash tests/docker/run_tests.sh TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"version\":\"0.9.0.1\"}'" bash tests/docker/run_tests.sh TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"version\":\"0.9.0.1\",\"new_consumer\":false}'" bash tests/docker/run_tests.sh TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"version\":\"1.1.1\",\"new_consumer\":false}'" bash tests/docker/run_tests.sh {code} The following commit broke all of them. QuotaTest was working with 3.5.0 revision and no more after cherry picking this commit. This is only on trunk, so there is no need to mention anywhere. https://github.com/apache/kafka/commit/125dbb92867c11739afc54c5e546f551f70d7113 Working on a fix. was (Author: fvaleri): Hi [~showuon], thanks for raising this. QuotaTest uses ProducerPerformanceService which is where the issue is located. This is the test instance that is failing. {code} [INFO:2023-07-24 05:00:40,057]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 'cls_name': 'QuotaTest', 'method_name': 'test_quota', 'injected_args': {'quota_type': 'client-id', 'old_client_throttling_behavior': True}} ... Exception: No output from ProducerPerformance {code} If we look at ProducerPerformanceService stderr logs in ./results, we have the reported exception. {code} Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThroughputThrottler at org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101) at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThroughputThrottler at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 2 more {code} There are other system test instances that use ProducerPerformanceService and are failing for the same reason. This is the full list. {code} TC_PATHS="tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota" _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters '{\"quota_type\":\"client-id\",\"old_client_throttling_behavior\":true}'" bash tests/docker/run_tests.sh
[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
fvaleri commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1665804677 @gharris1727 I tried various things, as I'm also learning how this works. The best solution I found is to move the ThroughputThrottler class from clients to server-common, adding this extra implementation dependency to connect:runtime (last commit on this branch). Why server-common? AFAIU, server-common hosts all classes shared by different modules, tools and connect:runtime in this case. Unlike tools, I think it makes sense to have this dependency for connect:runtime, as it may be used for other classes in the future. That way, we won't move the compatibility boundary. System tests will continue to use ProducerPerformance from dev branch as before, without bringing in dev clients, but using the version under test. Let me know if you see any issue with this approach. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14118: KAFKA-14875: Implement wakeup
philipnee commented on PR #14118: URL: https://github.com/apache/kafka/pull/14118#issuecomment-1665733961 I've been seeing stream test failures in the recent commits too! This PR needs to be rebased against this PR: https://github.com/apache/kafka/pull/14123 to resolve some of the failing committed test.. sorry about that @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14682: -- Description: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter instead of the former, which can cause tests to fail due to unnecessary stubbings when being run in that IDE but not when being built on Jenkins. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. This exact behavior has been reported elsewhere as a [Gradle issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on that thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], it appears this is a known and somewhat-intentional limitation of Mockito: {quote}I spent some time trying to solve this and eventually I stumbled upon this piece in Mockito's JUnit runner: [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53] {{ }} {{}} {{ // only report when: // 1. if all tests from given test have ran (filter requested is false) // Otherwise we would report unnecessary stubs even if the user runs just single test // from the class // 2. tests are successful (we don't want to add an extra failure on top of any existing // failure, to avoid confusion)}} (1) suggests that skipping unused stub validation is the intended behavior when the user filters a single test from the class. However, this behavior applies to any type of filter. And Gradle indeed applies a {{CategoryFilter}} if categories are configured: [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96] Which then causes Mockito to not validate unused stubs. {quote} was: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter instead of the former, which can cause tests to fail due to unnecessary stubbings when being run in that IDE but not when being built on Jenkins. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Assignee: Christo Lolov >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are
[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751125#comment-17751125 ] Chris Egerton commented on KAFKA-14682: --- [~christo_lolov] were you still planning on working on this? If not, I can take over. > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Assignee: Christo Lolov >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the > latter instead of the former, which can cause tests to fail due to > unnecessary stubbings when being run in that IDE but not when being built on > Jenkins. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd merged pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
satishd merged PR #13984: URL: https://github.com/apache/kafka/pull/13984 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
satishd commented on PR #13984: URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665559532 Right, there are a few unrelated test failures, merging it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Charles-Eddy updated KAFKA-15303: - Attachment: image (1).png > Foreign key joins no longer triggered by events on the right side of the join > after deployment with a new compatible Avro schema > > > Key: KAFKA-15303 > URL: https://issues.apache.org/jira/browse/KAFKA-15303 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Charles-Eddy >Priority: Critical > Attachments: image (1).png > > > Hello everyone, I am currently working on a project that uses Kafka Streams > (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. > Our goal is to join offer information from our sellers with additional data > from various input topics, and then feed the resulting joined information > into an output topic. > Our application is deployed in Kubernetes using the StatefulSet feature, with > one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. > We are using avro to serialize / deserialize input topics and storing in the > state stores of Kafka streams. > We have encountered a bug in Kafka Streams that prevents us from deploying > new versions of Kafka Streams containing new compatible Avro schemas of our > input topics. > The symptom is that after deploying our new version, which contains no > changes in topology but only changes to the Avro schema used, we discard > every event coming from the right part of the join concerned by these Avro > schema changes until we receive something from the left part of the join. > As a result, we are losing events and corrupting our output topics and stores > with outdated information. > After checking the local help for the priority to assign, I have assigned it > as *CRITICAL* because we are losing data (for example, tombstones are not > propagated to the following joins, so some products are still visible on our > website when they should not be). > Please feel free to change the priority if you think it is not appropriate. > > *The bug:* > After multiple hours of investigation we found out that the bug is located in > the foreign key join feature and specifically in this class: > *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key > join. > This class and his method process(...) is computing a hash from the local > store via a serialization of a deserialized value from the left state store > and comparing it with the hash of the original message from the > subscription-response-topic. > > It means that when we deploy a new version of our kafka streams instance with > a new compatible avro schema from the left side of a join, every join > triggered by the right part of the join are invalidated until we receive all > the events again on the left side. Every join triggered by the right part of > the join are discarded because all the hashes computed by kafka streams are > different now from the original messages. > > *How to reproduce it:* > If we take a working and a non-working workflow, it will do something like > this: > +Normal non-breaking+ workflow from the left part of the FK join: > # A new offer event occurs. The offer is received and stored (v1). > # A subscription registration is sent with the offer-hash (v1). > # The subscription is saved to the store with the v1 offer-hash. > # Product data is searched for. > # If product data is found, a subscription response is sent back, including > the v1 offer-hash. > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > > +Breaking workflow:+ > The offer serializer is changed to offer v2 > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v2) and response event (still v1) are compared. > # The join result is not sent since the hash is different. > > *Potential
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751099#comment-17751099 ] Christo Lolov commented on KAFKA-15267: --- Okay, I will send out an email later today > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires customers to clean any tiered topics before > switching off TS, which is a fair ask. Should we wish to revise this later it > should be possible. > h2. Option 5: Make Kafka forget about all remote information > Pros: > * Clean cut > Cons:
[GitHub] [kafka] ivanyu commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on PR #13984: URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665493163 Thank you @satishd Seems the failures aren't related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751095#comment-17751095 ] Luke Chen commented on KAFKA-15267: --- (1) and (3) are good to me. The only concern is we don't have KIP for adopting 4.1. Could we at least start a discussion thread in dev mailing list to see if there is any concern from the community? > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires customers to clean any tiered topics before > switching off TS, which is a fair ask.
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284280124 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -667,11 +675,323 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284278935 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2207,7 +2267,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason { var size = log.size toDelete.foreach { segment => size -= segment.size - log.info(s"Deleting segment $segment due to retention size ${log.config.retentionSize} breach. Log size " + + log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config)} breach. Local log size " + Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751078#comment-17751078 ] Christo Lolov commented on KAFKA-15195: --- Acknowledged, I will circle-back on this once 3.6.0 makes it out! > Regenerate segment-aligned producer snapshots when upgrading to a Kafka > version supporting Tiered Storage > - > > Key: KAFKA-15195 > URL: https://issues.apache.org/jira/browse/KAFKA-15195 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 3.6.0 >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Fix For: 3.6.0 > > > As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to > upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will > have to wait for retention to clean up segments without an associated > producer snapshot. > However, in our experience, customers of Kafka expect to be able to > immediately enable tiering on a topic once their cluster upgrade is complete. > Once they do this, however, they start seeing NPEs and no data is uploaded to > Tiered Storage > (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61). > To achieve this, we propose changing Kafka to retroactively create producer > snapshot files on upload whenever a segment is due to be archived and lacks > one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751077#comment-17751077 ] Christo Lolov commented on KAFKA-15267: --- Heya [~showuon]. Okay, I propose we do the following: * Mark this as a blocker for 3.6.0 * I take on implementing 4.1 as a solution to unblocking 3.6.0 without a KIP * I take a follow-up task on proposing a KIP for 4.2 once KIP-950 makes it in + implementing it Thoughts? > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires
[jira] [Comment Edited] (KAFKA-14038) Optimize calculation of size for log in remote tier
[ https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751076#comment-17751076 ] Christo Lolov edited comment on KAFKA-14038 at 8/4/23 10:37 AM: The remaining part of this ticket is blocked on merging https://github.com/apache/kafka/pull/13561 was (Author: christo_lolov): The remaining part of this ticket is blocked on merging https://issues.apache.org/jira/browse/KAFKA-14038 > Optimize calculation of size for log in remote tier > --- > > Key: KAFKA-14038 > URL: https://issues.apache.org/jira/browse/KAFKA-14038 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Major > Labels: KIP-405, needs-kip > Fix For: 3.6.0 > > > {color:#24292f}As per the Tiered Storage feature introduced in > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage], > users can configure the retention of remote tier based on time, by size, or > both. The work of computing the log segments to be deleted based on the > retention config is [owned by > RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool] > (RLM).{color} > {color:#24292f}To compute remote segments eligible for deletion based on > retention by size config, {color}RLM needs to compute the > {{total_remote_log_size}} i.e. the total size of logs available in the remote > tier for that topic-partition. RLM could use the > {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for > all the remote segments and then aggregate the segment sizes by using > {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log > size stored in the remote tier. > The above method involves iterating through all metadata of all the segments > i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM > thread. {color:#24292f}Since the main feature of tiered storage is storing a > large amount of data, we expect num_remote_segments to be large and a > frequent linear scan could be expensive (depending on the underlying storage > used by RemoteLogMetadataManager). > Segment offloads and segment deletions are run together in the same task and > a fixed size thread pool is shared among all topic-partitions. A slow logic > for calculation of total_log_size could result in the loss of availability as > demonstrated in the following scenario:{color} > # {color:#24292f}Calculation of total_size is slow and the threads in the > thread pool are busy with segment deletions{color} > # Segment offloads are delayed (since they run together with deletions) > # Local disk fills up, since local deletion requires the segment to be > offloaded > # If local disk is completely full, Kafka fails > Details are in KIP - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14038) Optimize calculation of size for log in remote tier
[ https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov reassigned KAFKA-14038: - Assignee: Christo Lolov (was: Divij Vaidya) > Optimize calculation of size for log in remote tier > --- > > Key: KAFKA-14038 > URL: https://issues.apache.org/jira/browse/KAFKA-14038 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Major > Labels: KIP-405, needs-kip > Fix For: 3.6.0 > > > {color:#24292f}As per the Tiered Storage feature introduced in > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage], > users can configure the retention of remote tier based on time, by size, or > both. The work of computing the log segments to be deleted based on the > retention config is [owned by > RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool] > (RLM).{color} > {color:#24292f}To compute remote segments eligible for deletion based on > retention by size config, {color}RLM needs to compute the > {{total_remote_log_size}} i.e. the total size of logs available in the remote > tier for that topic-partition. RLM could use the > {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for > all the remote segments and then aggregate the segment sizes by using > {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log > size stored in the remote tier. > The above method involves iterating through all metadata of all the segments > i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM > thread. {color:#24292f}Since the main feature of tiered storage is storing a > large amount of data, we expect num_remote_segments to be large and a > frequent linear scan could be expensive (depending on the underlying storage > used by RemoteLogMetadataManager). > Segment offloads and segment deletions are run together in the same task and > a fixed size thread pool is shared among all topic-partitions. A slow logic > for calculation of total_log_size could result in the loss of availability as > demonstrated in the following scenario:{color} > # {color:#24292f}Calculation of total_size is slow and the threads in the > thread pool are busy with segment deletions{color} > # Segment offloads are delayed (since they run together with deletions) > # Local disk fills up, since local deletion requires the segment to be > offloaded > # If local disk is completely full, Kafka fails > Details are in KIP - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14038) Optimize calculation of size for log in remote tier
[ https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751076#comment-17751076 ] Christo Lolov commented on KAFKA-14038: --- The remaining part of this ticket is blocked on merging https://issues.apache.org/jira/browse/KAFKA-14038 > Optimize calculation of size for log in remote tier > --- > > Key: KAFKA-14038 > URL: https://issues.apache.org/jira/browse/KAFKA-14038 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Labels: KIP-405, needs-kip > Fix For: 3.6.0 > > > {color:#24292f}As per the Tiered Storage feature introduced in > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage], > users can configure the retention of remote tier based on time, by size, or > both. The work of computing the log segments to be deleted based on the > retention config is [owned by > RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool] > (RLM).{color} > {color:#24292f}To compute remote segments eligible for deletion based on > retention by size config, {color}RLM needs to compute the > {{total_remote_log_size}} i.e. the total size of logs available in the remote > tier for that topic-partition. RLM could use the > {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for > all the remote segments and then aggregate the segment sizes by using > {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log > size stored in the remote tier. > The above method involves iterating through all metadata of all the segments > i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM > thread. {color:#24292f}Since the main feature of tiered storage is storing a > large amount of data, we expect num_remote_segments to be large and a > frequent linear scan could be expensive (depending on the underlying storage > used by RemoteLogMetadataManager). > Segment offloads and segment deletions are run together in the same task and > a fixed size thread pool is shared among all topic-partitions. A slow logic > for calculation of total_log_size could result in the loss of availability as > demonstrated in the following scenario:{color} > # {color:#24292f}Calculation of total_size is slow and the threads in the > thread pool are busy with segment deletions{color} > # Segment offloads are delayed (since they run together with deletions) > # Local disk fills up, since local deletion requires the segment to be > offloaded > # If local disk is completely full, Kafka fails > Details are in KIP - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
fvaleri commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1665393332 > So for example, if the node.version is 1.0.0, this condition will be true, and the script will add DEV jars to CLASSPATH. kafka-run-class.sh then finds the 1.0.0 jars and adds them to CLASSPATH. This shows my lack of knowledge on Kafka STs. I see it now. Thanks. > In https://github.com/apache/kafka/pull/13313 I explored eliminating the artifact mixing, but it turned out that the 0.8.x tools was missing functionality, and so I had to inject the 0.9.x tools jar. I think we can do something similar here, try to remove the artifact mixing, and if that fails, find an intermediate version that can be injected instead of trunk. Yeah, I'll try to do that and give an update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Charles-Eddy updated KAFKA-15303: - Description: Hello everyone, I am currently working on a project that uses Kafka Streams (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. Our goal is to join offer information from our sellers with additional data from various input topics, and then feed the resulting joined information into an output topic. Our application is deployed in Kubernetes using the StatefulSet feature, with one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. We are using avro to serialize / deserialize input topics and storing in the state stores of Kafka streams. We have encountered a bug in Kafka Streams that prevents us from deploying new versions of Kafka Streams containing new compatible Avro schemas of our input topics. The symptom is that after deploying our new version, which contains no changes in topology but only changes to the Avro schema used, we discard every event coming from the right part of the join concerned by these Avro schema changes until we receive something from the left part of the join. As a result, we are losing events and corrupting our output topics and stores with outdated information. After checking the local help for the priority to assign, I have assigned it as *CRITICAL* because we are losing data (for example, tombstones are not propagated to the following joins, so some products are still visible on our website when they should not be). Please feel free to change the priority if you think it is not appropriate. *The bug:* After multiple hours of investigation we found out that the bug is located in the foreign key join feature and specifically in this class: *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key join. This class and his method process(...) is computing a hash from the local store via a serialization of a deserialized value from the left state store and comparing it with the hash of the original message from the subscription-response-topic. It means that when we deploy a new version of our kafka streams instance with a new compatible avro schema from the left side of a join, every join triggered by the right part of the join are invalidated until we receive all the events again on the left side. Every join triggered by the right part of the join are discarded because all the hashes computed by kafka streams are different now from the original messages. *How to reproduce it:* If we take a working and a non-working workflow, it will do something like this: +Normal non-breaking+ workflow from the left part of the FK join: # A new offer event occurs. The offer is received and stored (v1). # A subscription registration is sent with the offer-hash (v1). # The subscription is saved to the store with the v1 offer-hash. # Product data is searched for. # If product data is found, a subscription response is sent back, including the v1 offer-hash. # The offer data in the store is searched for and the offer hashes between the store (v1) and response event (also v1) are compared. # Finally, the join result is sent. New product event from the right part of the FK join: # The product is received and stored. # All related offers in the registration store are searched for. # A subscription response is sent for each offer, including their offer hash (v1). # The offer data in the store is searched for and the offer hashes between the store (v1) and response event (also v1) are compared. # Finally, the join result is sent. +Breaking workflow:+ The offer serializer is changed to offer v2 New product event from the right part of the FK join: # The product is received and stored. # All related offers in the registration store are searched for. # A subscription response is sent for each offer, including their offer hash (v1). # The offer data in the store is searched for and the offer hashes between the store (v2) and response event (still v1) are compared. # The join result is not sent since the hash is different. *Potential Fix:* Currently, the offer’s hash is computed from the serialization of deserialized offer data. This means that binary offer data v1 is deserialized into a v2 data class using a v1-compatible deserializer, and then serialized into binary v2 data. As a result, we are comparing a v2 hash (from the store) with a v1 hash (from the response event). Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by directly retrieving the bytes from the store to compute the hash? This way, the hash would be the same. Thanks. was: Hello everyone, I am currently working on a project that uses Kafka Streams (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. Our goal is to join offer information from our
[GitHub] [kafka] hudeqi commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown
hudeqi commented on PR #13929: URL: https://github.com/apache/kafka/pull/13929#issuecomment-1665378793 > I still need to get 3.5.1 out and focus on KIP-405 PRs (targeting 3.6 release) before I get some more time @hudeqi! Hi, do you have time to review this PR now? I want to combine all the currently opened PRs about metric leaks into one PR. I don’t know if this will help or burden your review? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema
Charles-Eddy created KAFKA-15303: Summary: Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema Key: KAFKA-15303 URL: https://issues.apache.org/jira/browse/KAFKA-15303 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.4.0 Reporter: Charles-Eddy Hello everyone, I am currently working on a project that uses Kafka Streams (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. Our goal is to join offer information from our sellers with additional data from various input topics, and then feed the resulting joined information into an output topic. Our application is deployed in Kubernetes using the StatefulSet feature, with one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. We are using avro to serialize / deserialize input topics and storing in the state stores of Kafka streams. We have encountered a bug in Kafka Streams that prevents us from deploying new versions of Kafka Streams containing new compatible Avro schemas of our input topics. The symptom is that after deploying our new version, which contains no changes in topology but only changes to the Avro schema used, we discard every event coming from the right part of the join concerned by these Avro schema changes until we receive something from the left part of the join. As a result, we are losing events and corrupting our output topics and stores with outdated information. After checking the local help for the priority to assign, I have assigned it as *CRITICAL* because we are losing data (for example, tombstones are not propagated to the following joins, so some products are still visible on our website when they should not be). Please feel free to change the priority if you think it is not appropriate. *The bug:* After multiple hours of investigation we found out that the bug is located in the foreign key join feature and specifically in this class: *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key join. This class and his method process(...) is computing a hash from the local store via a serialization of a deserialized value from the left state store and comparing it with the hash of the original message from the subscription-response-topic. It means that when we deploy a new version of our kafka streams instance with a new compatible avro schema from the left side of a join, every join triggered by the right part of the join are invalidated until we receive all the events again on the left side. Every join triggered by the right part of the join are discarded because all the hashes computed by kafka streams are different now from the original messages. *How to reproduce it:* If we take a working and a non-working workflow, it will do something like this: +Normal non-breaking+ workflow from the left part of the FK join: # A new offer event occurs. The offer is received and stored (v1). # A subscription registration is sent with the offer-hash (v1). # The subscription is saved to the store with the v1 offer-hash. # Product data is searched for. # If product data is found, a subscription response is sent back, including the v1 offer-hash. # The offer data in the store is searched for and the offer hashes between the store (v1) and response event (also v1) are compared. # Finally, the join result is sent. New product event from the right part of the FK join: # The product is received and stored. # All related offers in the registration store are searched for. # A subscription response is sent for each offer, including their offer hash (v1). # The offer data in the store is searched for and the offer hashes between the store (v1) and response event (also v1) are compared. # Finally, the join result is sent. +Breaking workflow:+ The offer serializer is changed to offer v2 New product event from the right part of the FK join: # The product is received and stored. # All related offers in the registration store are searched for. # A subscription response is sent for each offer, including their offer hash (v1). # The offer data in the store is searched for and the offer hashes between the store (v2) and response event (still v1) are compared. # The join result is not sent since the hash is different. *Potential Fix:* Currently, the offer’s hash is computed from the serialization of deserialized offer data. This means that binary offer data v1 is deserialized into a v2 data class using a v1-compatible deserializer, and then serialized into binary v2 data. As a result, we are comparing a v2 hash (from the store) with a v1 hash (from the response event). Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by directly retrieving the bytes from the store to compute the
[GitHub] [kafka] yashmayya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
yashmayya commented on PR #14152: URL: https://github.com/apache/kafka/pull/14152#issuecomment-1665377020 @cadonna @mjsax would you be able to take a look at this whenever you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
yashmayya commented on code in PR #14152: URL: https://github.com/apache/kafka/pull/14152#discussion_r1284245654 ## streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java: ## @@ -164,17 +160,6 @@ public void shouldThrowNullPointerIfInnerIsNull() { assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); } -@Test -public void shouldThrowNullPointerIfKeySerdeIsNull() { -assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime())); -} - -@Test -public void shouldThrowNullPointerIfValueSerdeIsNull() { -assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(supplier, Serdes.String(), -null, new MockTime())); -} Review Comment: These tests were buggy - there aren't currently any `null` checks in place for key / value serdes (or at least not in the builder). The only reason they were passing earlier is due to the use of `EasyMock` with ["nice" mocks](https://easymock.org/user-guide.html#mocking-nice). In the `setUp` method, the expectation for the `WindowBytesStoreSupplier::name` method is setup - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L61 By default, this method stub covers only the first invocation of the method and subsequent invocations will result in a `null` value being returned (because the mock object is "nice"). The first invocation occurs in the `setUp` method itself when the builder is instantiated - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L65-L70 https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java#L39 The `null` check for name here - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java#L41 is what resulted in the `NullPointerException` being thrown in the `shouldThrowNullPointerIfKeySerdeIsNull` and `shouldThrowNullPointerIfValueSerdeIsNull` tests. This test bug was discovered because method stubbings in `Mockito` are applied to any number of invocations of the method by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
yashmayya commented on code in PR #14152: URL: https://github.com/apache/kafka/pull/14152#discussion_r1284245654 ## streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java: ## @@ -164,17 +160,6 @@ public void shouldThrowNullPointerIfInnerIsNull() { assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); } -@Test -public void shouldThrowNullPointerIfKeySerdeIsNull() { -assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime())); -} - -@Test -public void shouldThrowNullPointerIfValueSerdeIsNull() { -assertThrows(NullPointerException.class, () -> new WindowStoreBuilder<>(supplier, Serdes.String(), -null, new MockTime())); -} Review Comment: These tests were buggy - there aren't currently any `null` checks in place for key / value serdes. The only reason they were passing earlier is due to the use of `EasyMock` with ["nice" mocks](https://easymock.org/user-guide.html#mocking-nice). In the `setUp` method, the expectation for the `WindowBytesStoreSupplier::name` method is setup - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L61 By default, this method stub covers only the first invocation of the method and subsequent invocations will result in a `null` value being returned (because the mock object is "nice"). The first invocation occurs in the `setUp` method itself when the builder is instantiated - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L65-L70 https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java#L39 The `null` check for name here - https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java#L41 is what resulted in the `NullPointerException` being thrown in the `shouldThrowNullPointerIfKeySerdeIsNull` and `shouldThrowNullPointerIfValueSerdeIsNull` tests. This test bug was discovered because method stubbings in `Mockito` are applied to any number of invocations of the method by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1284241897 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + +info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { Review Comment: This is not required as the updated code does not use this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
yashmayya opened a new pull request, #14152: URL: https://github.com/apache/kafka/pull/14152 - Part of the larger effort to replace the usage of `EasyMock` and `PowerMock` with `Mockito` across the project that has been underway for the last few years (see https://issues.apache.org/jira/browse/KAFKA-7438) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
showuon opened a new pull request, #14151: URL: https://github.com/apache/kafka/pull/14151 When configuring RLMM, the configs passed into `configure` method is the `RemoteLogManagerConfig`. But in `RemoteLogManagerConfig`, there's no configs related to `remote.log.metadata.*`, ex: `remote.log.metadata.topic.replication.factor`. So, even if users have set the config in broker, it'll never be applied. This PR fixed the issue to allow users passed in `remote.log.metadata.*` configs into RLMM, including `remote.log.metadata.common.client.`/`remote.log.metadata.producer.`/ `remote.log.metadata.consumer.` prefixes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
satishd commented on PR #13984: URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665334207 Will merge if the Jenkins jobs do not fail with any related tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15295) Add config validation when remote storage is enabled on a topic
[ https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15295: Assignee: Luke Chen (was: Kamal Chandraprakash) > Add config validation when remote storage is enabled on a topic > --- > > Key: KAFKA-15295 > URL: https://issues.apache.org/jira/browse/KAFKA-15295 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Luke Chen >Priority: Major > Fix For: 3.6.0 > > > If system level remote storage is not enabled, then enabling remote storage > on a topic should throw exception while validating the configs. > See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for > more details -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15295) Add config validation when remote storage is enabled on a topic
[ https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751054#comment-17751054 ] Kamal Chandraprakash commented on KAFKA-15295: -- [~showuon] I'm not working on this one currently. Thanks for taking it forward! > Add config validation when remote storage is enabled on a topic > --- > > Key: KAFKA-15295 > URL: https://issues.apache.org/jira/browse/KAFKA-15295 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0 > > > If system level remote storage is not enabled, then enabling remote storage > on a topic should throw exception while validating the configs. > See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for > more details -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14598) Fix flaky ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Pankaj resolved KAFKA-14598. --- Resolution: Fixed Did not observe this recently > Fix flaky ConnectRestApiTest > > > Key: KAFKA-14598 > URL: https://issues.apache.org/jira/browse/KAFKA-14598 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ashwin Pankaj >Assignee: Ashwin Pankaj >Priority: Minor > Labels: flaky-test > > ConnectRestApiTest sometimes fails with the message > {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not > Found\n\nHTTP ERROR 404 Not > Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not > > Found\nSERVLET:-\n\n\n\n\n', > 'http://172.31.1.75:8083/connector-plugins/')}} > This happens because ConnectDistributedService.start() by default waits till > the the line > {{Joined group at generation ..}} is visible in the logs. > In most cases this is sufficient. But in the cases where the test fails, we > see that this message appears even before Connect RestServer has finished > initialization. > {quote} - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Joined group at generation 2 with protocol version 1 > and got assignment: Assignment{error=0, > leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', > leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 > +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" > "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer) > - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is > started and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14598) Fix flaky ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751051#comment-17751051 ] Ashwin Pankaj commented on KAFKA-14598: --- Did not see this occuring recently - closing this issue. > Fix flaky ConnectRestApiTest > > > Key: KAFKA-14598 > URL: https://issues.apache.org/jira/browse/KAFKA-14598 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ashwin Pankaj >Assignee: Ashwin Pankaj >Priority: Minor > Labels: flaky-test > > ConnectRestApiTest sometimes fails with the message > {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not > Found\n\nHTTP ERROR 404 Not > Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not > > Found\nSERVLET:-\n\n\n\n\n', > 'http://172.31.1.75:8083/connector-plugins/')}} > This happens because ConnectDistributedService.start() by default waits till > the the line > {{Joined group at generation ..}} is visible in the logs. > In most cases this is sufficient. But in the cases where the test fails, we > see that this message appears even before Connect RestServer has finished > initialization. > {quote} - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Joined group at generation 2 with protocol version 1 > and got assignment: Assignment{error=0, > leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', > leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 > +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" > "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer) > - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is > started and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751036#comment-17751036 ] Luke Chen commented on KAFKA-15083: --- [~satish.duggana] , sorry that I didn't get the ping last time. The issue here is, when configuring RLMM, the configs passed into `configure` method is the `RemoteLogManagerConfig`. But in `RemoteLogManagerConfig`, there's no configs related to `{_}{color:#00}remote.log.metadata.*`, ex: `remote.log.metadata.topic.replication.factor`.{color}{_} {color:#00}So, even if users have set the config in broker, it'll never be applied. Working on a patch now.{color} > Passing "remote.log.metadata.*" configs into RLMM > - > > Key: KAFKA-15083 > URL: https://issues.apache.org/jira/browse/KAFKA-15083 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Based on the > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: > |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM > implementation creates producer and consumer instances. Common client > propoerties can be configured with `remote.log.metadata.common.client.` > prefix. User can also pass properties specific to > {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` > and `remote.log.metadata.consumer.` prefixes. These will override properties > with `remote.log.metadata.common.client.` prefix.{color} > {color:#00}Any other properties should be prefixed with > "remote.log.metadata." and these will be passed to > RemoteLogMetadataManager#configure(Map props).{color} > {color:#00}For ex: Security configuration to connect to the local broker > for the listener name configured are passed with props.{color}| > > This is missed from current implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751033#comment-17751033 ] Luke Chen commented on KAFKA-15267: --- Do we have any update on this issue? I think this is a blocker for v3.6.0, do you agree? > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires customers to clean any tiered topics before > switching off TS, which is a fair ask. Should we wish to revise this later it > should be possible. > h2. Option 5: Make Kafka forget about all remote
[GitHub] [kafka] satishd commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
satishd commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284146317 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > this.customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); Review Comment: That looks reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15295) Add config validation when remote storage is enabled on a topic
[ https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751031#comment-17751031 ] Luke Chen commented on KAFKA-15295: --- [~ckamal] , are you still working on this? If you don't have time, I can work on it. Please let me know. Thanks. > Add config validation when remote storage is enabled on a topic > --- > > Key: KAFKA-15295 > URL: https://issues.apache.org/jira/browse/KAFKA-15295 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0 > > > If system level remote storage is not enabled, then enabling remote storage > on a topic should throw exception while validating the configs. > See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for > more details -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284145024 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java: ## @@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final int segmentSizeInBytes; +/** + * Custom metadata. Review Comment: Thanks, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
showuon commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284105567 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig { public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " + "needed by RemoteLogMetadataManager implementation."; +public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP = "remote.log.metadata.custom.metadata.max.size"; Review Comment: No concern. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #14145: KAFKA-10199: Change to RUNNING if no pending task to recycle exist
cadonna merged PR #14145: URL: https://github.com/apache/kafka/pull/14145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #14145: KAFKA-10199: Change to RUNNING if no pending task to recycle exist
cadonna commented on PR #14145: URL: https://github.com/apache/kafka/pull/14145#issuecomment-1665114431 Build failures are unrelated: ``` Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetSyncsTopicsOnTarget() Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Build / JDK 8 and Scala 2.12 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 8 and Scala 2.12 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.runtime.rest.ConnectRestServerTest.testCORSDisabled Build / JDK 20 and Scala 2.13 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
showuon commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1284009383 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -64,302 +65,387 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); -private static final long POLL_INTERVAL_MS = 100L; +static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); -private final KafkaConsumer consumer; -private final String metadataTopicName; +private final Consumer consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; -private final Time time; +private final Time time = new SystemTime(); +// TODO - Update comments below // It indicates whether the closing process has been started or not. If it is set as true, // consumer will stop consuming messages, and it will not allow partition assignments to be updated. -private volatile boolean closing = false; - +private volatile boolean isClosed = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is // determined that the consumer needs to be assigned with the updated partitions. -private volatile boolean assignPartitions = false; +private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. -private volatile Set assignedMetaPartitions = Collections.emptySet(); +private volatile Set assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. -private Set assignedTopicPartitions = Collections.emptySet(); +private volatile Map assignedUserTopicIdPartitions = Collections.emptyMap(); +private volatile Set processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); -// Map of remote log metadata topic partition to consumed offsets. Received consumer records -// may or may not have been processed based on the assigned topic partitions. -private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); +private long uninitializedAt = time.milliseconds(); +private boolean isAllUserTopicPartitionsInitialized; -// Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. -private Map lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); +// Map of remote log metadata topic partition to consumed offsets. +private final Map readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); +private final Map readOffsetsByUserTopicPartition = new HashMap<>(); -private final long committedOffsetSyncIntervalMs; -private CommittedOffsetsFile committedOffsetsFile; -private long lastSyncedTimeMs; +private Map offsetHolderByMetadataPartition = new HashMap<>(); +private boolean isOffsetsFetchFailed = false; +private long lastFailedFetchOffsetsTimestamp; -public ConsumerTask(KafkaConsumer consumer, -String metadataTopicName, -RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, +public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, -Path committedOffsetsPath, -Time time, -long committedOffsetSyncIntervalMs) { -this.consumer = Objects.requireNonNull(consumer); -this.metadataTopicName = Objects.requireNonNull(metadataTopicName); +Function, Consumer> consumerSupplier) { +this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); -this.time = Objects.requireNonNull(time); -this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - -initializeConsumerAssignment(committedOffsetsPath); -} - -private void initializeConsumerAssignment(Path committedOffsetsPath) { -try { -committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); -} catch (IOException e) { -throw new KafkaException(e); -} - -Map committedOffsets = Collections.emptyMap(); -try { -// Load committed offset and
[GitHub] [kafka] lihaosky opened a new pull request, #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
lihaosky opened a new pull request, #14150: URL: https://github.com/apache/kafka/pull/14150 ### Description 1. Add configs for rack aware assignor 2. Update standby optimizer in `RackAwareTaskAssignor` to have more rounds 3. Refactor some method in `RackAwareTaskAssignorTest` to test utils so that they can also be used in `HighAvailabilityTaskAssignorTest` and other tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
satishd commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284030338 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java: ## @@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final int segmentSizeInBytes; +/** + * Custom metadata. Review Comment: I think having that in `CustomMetadata` makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org