[GitHub] [kafka] kamalcph commented on a diff in pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-04 Thread via GitHub


kamalcph commented on code in PR #14151:
URL: https://github.com/apache/kafka/pull/14151#discussion_r1284973824


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String REMOTE_LOG_METADATA_PREFIX = 
"remote.log.metadata";

Review Comment:
   Should we append the `PROP` (or) `CONFIG` suffix to it?



##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String REMOTE_LOG_METADATA_PREFIX = 
"remote.log.metadata";

Review Comment:
   Should we have append the `_PROP` (or) `_CONFIG` suffix to it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12890) Consumer group stuck in `CompletingRebalance`

2023-08-04 Thread Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751269#comment-17751269
 ] 

Yang commented on KAFKA-12890:
--

Hi [~dajac]   I know this issue and PR have been closed for a while, just 
wonder if you have any broker-side log for this issue that you can share. 
Thanks!

> Consumer group stuck in `CompletingRebalance`
> -
>
> Key: KAFKA-12890
> URL: https://issues.apache.org/jira/browse/KAFKA-12890
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 2.8.1, 3.0.0
>
>
> We have seen recently multiple consumer groups stuck in 
> `CompletingRebalance`. It appears that those group never receives the 
> assignment from the leader of the group and remains stuck in this state 
> forever.
> When a group transitions to the `CompletingRebalance` state, the group 
> coordinator sets up `DelayedHeartbeat` for each member of the group. It does 
> so to ensure that the member sends a sync request within the session timeout. 
> If it does not, the group coordinator rebalances the group. Note that here, 
> `DelayedHeartbeat` is used here for this purpose. `DelayedHeartbeat` are also 
> completed when member heartbeats.
> The issue is that https://github.com/apache/kafka/pull/8834 has changed the 
> heartbeat logic to allow members to heartbeat while the group is in the 
> `CompletingRebalance` state. This was not allowed before. Now, if a member 
> starts to heartbeat while the group is in the `CompletingRebalance`, the 
> heartbeat request will basically complete the pending `DelayedHeartbeat` that 
> was setup previously for catching not receiving the sync request. Therefore, 
> if the sync request never comes, the group coordinator does not notice 
> anymore.
> We need to bring that behavior back somehow.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler could be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever. A handler 
could help to break the infinite retry loop.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler would be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer batches up multiple records into batches, and a single record 
> specific error might fail the whole batch.
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15259 

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes 

[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-08-04 Thread via GitHub


showuon merged PR #14133:
URL: https://github.com/apache/kafka/pull/14133


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-08-04 Thread via GitHub


showuon commented on PR #14133:
URL: https://github.com/apache/kafka/pull/14133#issuecomment-1666390423

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testOneWord()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.ReassignPartitionsIntegrationTest.testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-04 Thread via GitHub


showuon commented on PR #14151:
URL: https://github.com/apache/kafka/pull/14151#issuecomment-1666390279

   > Yeah, okay, this pull request makes sense to me. To summarise, you are 
saying that configurations starting with `remote.log.metadata` need to be 
passed to implementations of the RemoteLogMetadataManager as part of the 
rlmmProps, however, they are never put in the rlmmProps by the 
RemoteLogManagerConfig?
   > 
   > Test failures appear to be unrelated.
   
   Correct! Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751263#comment-17751263
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for digging into it – that's a good fine – I was already wondering, 
because I could not see any code changes between 3.1 and 3.2 in Kafka Streams 
that would explain it.

And yes, if the producer goes into error-state, it is impossible for KS to 
"revert" it – thus, I am not sure right now how we could fix it... In the end, 
Kafka Streams does `producer.flush()` and evaluates if there are any errors, 
detect the `RecordTooLargeException`, executed the handler which returns 
`CONTINUE` what is respected. If it would not be respected, the 
`RecordTooLargeException` would be re-thrown right away. But because Kafka 
Streams does `CONTINUE` it actually tries to commit, but cannot because the 
producer is already in error state.

I high level idea would be, trying to remember the input record offset, and 
after we failed and the task is restarted, has an implicit filter that drops 
the input message right away based on the offset we did remember. But such a 
thing would needs to get very careful design...

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is 

[GitHub] [kafka] mjsax merged pull request #14105: MINOR: improve logging for FK-join

2023-08-04 Thread via GitHub


mjsax merged PR #14105:
URL: https://github.com/apache/kafka/pull/14105


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable

2023-08-04 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-15202:
---

Assignee: Greg Harris

> MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
> ---
>
> Key: KAFKA-15202
> URL: https://issues.apache.org/jira/browse/KAFKA-15202
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> The spacing between OffsetSyncs can vary significantly, due to conditions in 
> the upstream topic and in the replication rate of the MirrorSourceTask.
> The OffsetSyncStore attempts to keep a maximal number of distinct syncs 
> present, and for regularly spaced syncs it does not allow an incoming sync to 
> expire more than one other unique sync. There are tests to enforce this 
> property.
> For variable spaced syncs, there is no such guarantee, because multiple 
> fine-grained syncs may need to be expired at the same time. However, instead 
> of only those fine-grained syncs being expired, the store may also expire 
> coarser-grained syncs. This causes a large decrease in the number of unique 
> syncs.
> This is an extremely simple example: Syncs: 0 (start), 1, 2, 4.
> The result:
> {noformat}
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, 
> downstreamOffset=1} applied, new state is [1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, 
> downstreamOffset=2} applied, new state is [2:2,1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, 
> downstreamOffset=4} applied, new state is [4:4,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat}
> Instead of being expired, the 2:2 sync should still be present in the final 
> state, allowing the store to maintain 3 unique syncs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-04 Thread via GitHub


gharris1727 opened a new pull request, #14156:
URL: https://github.com/apache/kafka/pull/14156

   The new OffsetSyncStore historical translation cache clears more syncs than 
necessary when the gap between syncs is variable. This is a problem with the 
replacement promotion logic, which only covered the base case when promoting an 
one index to the immediately following index.
   
   This has the effect that if a sync fails to satisfy an invariant for the 
following index, then it is discarded immediately, even if the value would 
satisfy the invariants at a different index. In particular, invariant B which 
enforces a maximum distance between two syncs gets more permissive as the index 
in the array increases, so a sync which does not satisfy invariant B at index 1 
may satisfy it at index j > 1.
   
   Instead of the greedy discarding algorithm, the replacement promotion logic 
should keep a separate index into the potential replacements from the original 
array, and delay discarding a sync until it can be determined that the sync is 
not valid at any place in the array. In particular, syncs are certainly not 
worth keeping if they are duplicates of other syncs, or if they fail invariant 
C. Invariant C becomes more strict as the index in the array increases, so a 
sync which does not satisfy invariant C at index 1 will certainly never satisfy 
it at index j > 1.
   
   In order to verify the changes, new tests which use Random to generate gaps 
between syncs, and generalize the test for maintaining unique syncs to an 
arbitrary stream of gaps. The different tests cover:
   
   1. Constant spacing
   2. Uniform random spacing between 0 and N
   3. Uniform random spacing between M and N with M > 0
   4. Bimodal spacing at maxOffsetLag and 2x maxOffsetLag
   
   The new algorithm is an extension of the existing one, so all of the 
consistent-spacing tests have the exact same behavior.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15308:

Affects Version/s: (was: 3.4.0)
   (was: 3.5.0)

> Wipe Stores upon OffsetOutOfRangeException in ALOS
> --
>
> Key: KAFKA-15308
> URL: https://issues.apache.org/jira/browse/KAFKA-15308
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Colt McNealy
>Priority: Minor
>
> As per this [Confluent Community Slack 
> Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
>  Streams currently does not wipe away RocksDB state upon encountering an 
> `OffsetOutOfRangeException` in ALOS.
>  
> `OffsetOutOfRangeException` is a rare case that occurs when a standby task 
> requests offsets that no longer exist in the topic. We should wipe the store 
> for three reasons:
>  # Not wiping the store can be a violation of ALOS since some of the 
> now-missing offsets could have contained tombstone records.
>  # Wiping the store has no performance cost since we need to replay the 
> entirety of what's in the changelog topic anyways.
>  # I have heard (not yet confirmed myself) that we wipe the store in EOS 
> anyways, so fixing this bug could remove a bit of complexity from supporting 
> EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Colt McNealy (Jira)
Colt McNealy created KAFKA-15308:


 Summary: Wipe Stores upon OffsetOutOfRangeException in ALOS
 Key: KAFKA-15308
 URL: https://issues.apache.org/jira/browse/KAFKA-15308
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.0, 3.4.0, 3.3.0
Reporter: Colt McNealy


As per this [Confluent Community Slack 
Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
 Streams currently does not wipe away RocksDB state upon encountering an 
`OffsetOutOfRangeException` in ALOS.

 

`OffsetOutOfRangeException` is a rare case that occurs when a standby task 
requests offsets that no longer exist in the topic. We should wipe the store 
for three reasons:
 # Not wiping the store can be a violation of ALOS since some of the 
now-missing offsets could have contained tombstone records.
 # Wiping the store has no performance cost since we need to replay the 
entirety of what's in the changelog topic anyways.
 # I have heard (not yet confirmed myself) that we wipe the store in EOS 
anyways, so fixing this bug could remove a bit of complexity from supporting 
EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mehbey commented on pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-04 Thread via GitHub


mehbey commented on PR #14135:
URL: https://github.com/apache/kafka/pull/14135#issuecomment-1666331731

   Updated PR to address comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-04 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1284932829


##
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##
@@ -642,7 +642,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 props.put(KafkaConfig.CompressionTypeProp, "gzip")
 props.put(KafkaConfig.LogPreAllocateProp, true.toString)
 props.put(KafkaConfig.LogMessageTimestampTypeProp, 
TimestampType.LOG_APPEND_TIME.toString)
-props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")

Review Comment:
   I have made three types of changes to teh tests
   1) Added parametrized tests to validation on all new configs and the 
deprecated one. See changes in `PlaintextProducerSendTest.scala` and 
`ProducerRequestTest.scala`
   2) Added test cases for backward compatibility for both `before` and `after` 
cases
   3) Removed unnecessary calls to set 
`TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` to the default value. 
This calls don't add any value to the test since the calls are not changing the 
default value. See example changes in `LogCleanerTest.scala` and 
`LogManagerTest.scala`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284932353


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   Thanks. We will need a docs PR anyway -- can you open one in the next days 
and include this information? -- Wondering if the `description` in 
`StreamsConfig` should also mention the different default values for both 
assignors and explain that `null` means use those defaults?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14155: MINOR: update Kafka Streams state.dir doc

2023-08-04 Thread via GitHub


mjsax commented on PR #14155:
URL: https://github.com/apache/kafka/pull/14155#issuecomment-1666311777

   `kafka-site.git` PR: https://github.com/apache/kafka-site/pull/536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751241#comment-17751241
 ] 

Satish Duggana commented on KAFKA-15267:


Option 4.1 is the option suggested in 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes]
 It does not require a KIP. It is a minor change to add to the existing code.

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, 

[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-04 Thread via GitHub


lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1284914061


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java:
##
@@ -57,6 +57,7 @@ public StickyTaskAssignor() {
 public boolean assign(final Map clients,
   final Set allTaskIds,
   final Set statefulTaskIds,
+  final RackAwareTaskAssignor rackAwareTaskAssignor,

Review Comment:
   Sounds good



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -124,11 +132,20 @@ private static void assignActiveStatefulTasks(final 
SortedMap
 ClientState::assignActive,
 (source, destination) -> true
 );
+
+if (rackAwareTaskAssignor != null && 
rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
+final int trafficCost = configs.rackAwareAssignmentTrafficCost == 
null ?

Review Comment:
   Because the costs are different for stateful and stateless tasks. So setting 
in constructor means we need to pass in two RackAwareTaskAssignor objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


lihaosky commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284910264


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##
@@ -267,25 +267,46 @@ public static class AssignmentConfigs {
 public final int numStandbyReplicas;
 public final long probingRebalanceIntervalMs;
 public final List rackAwareAssignmentTags;
+public final Integer rackAwareAssignmentTrafficCost;

Review Comment:
   It can be null and we will use default in assignor. It's related to your 
questions in https://github.com/apache/kafka/pull/14139#discussion_r1284910181



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   The default value is different for `HAAssignor` and `StickyAssignor`, so the 
null here means use default in assignor. If it's set, it would override 
assignor's default. I guess we can guide to set relative values. Maybe we can 
mention default in HAAssignor is 10, 1 and in sticky assignor, it will be 1, 
10...



##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1378,6 +1380,48 @@ public void 
shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
 assertEquals(0, configs.size());
 }
 
+@Test
+public void shouldReturnDefaultRackAwareAssignmentConfig() {
+final String strategy = 
streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+assertEquals("NONE", strategy);
+}
+
+@Test
+public void shouldtSetMinTrafficRackAwareAssignmentConfig() {
+props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
+assertEquals("MIN_TRAFFIC", new 
StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));

Review Comment:
   It will use default cost in assignor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751236#comment-17751236
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

Thanks for reporting this issue. – When you call `store.all()` you get a 
iterator back that is build over the cache as well as RocksDB. For the 
underlying RocksDB iterator, it provides an immutable snapshot, thus any later 
writes into RocksDB are not visible to the iterator. Thus, if the cache is 
flushed, and we try to read the key from the cache and cannot find it, we go to 
the underlying RocksDB iterator which cannot see the write. This should explain 
it.

What I am wondering though right now is, why the cache would get flushed to 
begin with? – There should not be an explicit `store.flush()` call because we 
only flush before a `commit()` what happens on the same thread; we might also 
`evict()` during a `put()` if the cache overflows, but there is no `put()` call 
in between; the third case I could find is, when a new `StreamThread` is added 
and we need to resize the cache (this would indeed be an concurrent operation; 
could adding/removing a thread explain what you observe?

Otherwise we would need to do more digging while the cache is flushed begin 
with? If we flush incorrectly and can avoid the flush we should be able to fix 
it. If we flush correctly, we might need to have a guard inside the caching 
layer itself and suppress the flush if there is an open iterator (what does 
actually not sound like a great solution, but maybe it would be the correct way 
forward.)

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> 

[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-04 Thread via GitHub


lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1284910181


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -43,21 +43,27 @@
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
+private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+private static final int DEFAULT_STATELESS_TRAFFIC_COST = 1;
+private static final int DEFAULT_STATELESS_NON_OVERLAP_COST = 1;

Review Comment:
   Yeah. The only stateful cost is configurable. The default is not set in 
`StreamsConfig` because it will be different for `HAAssignor` and 
`StickyAssignor`. So the default in `StreamsConfig` is null which means use 
assignor's default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-04 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1284873983


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -701,7 +713,9 @@ private long partitionReady(Cluster cluster, long nowMs, 
String topic,
 }
 
 waitedTimeMs = batch.waitedTimeMs(nowMs);
-backingOff = batch.attempts() > 0 && waitedTimeMs < 
retryBackoffMs;
+backingOff = !batch.hasLeaderChanged(leader) &&
+batch.attempts() > 0 &&
+waitedTimeMs < retryBackoff.backoff(batch.attempts() - 
1);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-04 Thread via GitHub


mjsax commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1284866806


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -43,21 +43,27 @@
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
+private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+private static final int DEFAULT_STATELESS_TRAFFIC_COST = 1;
+private static final int DEFAULT_STATELESS_NON_OVERLAP_COST = 1;

Review Comment:
   We have different costs for stataful vs stateless here, but the configs 
added only allow to pass in one value each for traffic and non-overlap. How 
does this fix together?
   
   From the code below it seem, users only configure cost for stateful anyway, 
but stateless cost is alway `1` (for this case, the variables should not be 
called `DEFAULT` because they cannot be changed).
   
   Also, if we have default value, should they not be set in `StreamsConfig`?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java:
##
@@ -57,6 +57,7 @@ public StickyTaskAssignor() {
 public boolean assign(final Map clients,
   final Set allTaskIds,
   final Set statefulTaskIds,
+  final RackAwareTaskAssignor rackAwareTaskAssignor,

Review Comment:
   Should we make this an `Optional` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -124,11 +132,20 @@ private static void assignActiveStatefulTasks(final 
SortedMap
 ClientState::assignActive,
 (source, destination) -> true
 );
+
+if (rackAwareTaskAssignor != null && 
rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
+final int trafficCost = configs.rackAwareAssignmentTrafficCost == 
null ?

Review Comment:
   Why do we get `trafficCost` each time? Seems we should set it up in the 
constructor just a single time (or implement `Configurable` interface)? (Same 
for `nonOverlapCost`)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java:
##
@@ -16,18 +16,29 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import java.util.Optional;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 
 public interface TaskAssignor {
 /**
  * @return whether the generated assignment requires a followup probing 
rebalance to satisfy all conditions
  */
-boolean assign(Map clients,
-   Set allTaskIds,
-   Set statefulTaskIds,
-   AssignorConfiguration.AssignmentConfigs configs);
+boolean assign(final Map clients,
+   final Set allTaskIds,
+   final Set statefulTaskIds,
+   final Cluster cluster,
+   final Map> partitionsForTask,
+   final Map> 
changelogPartitionsForTask,
+   final Map> tasksForTopicGroup,
+   final Map>> 
racksForProcessConsumer,
+   final InternalTopicManager internalTopicManager,

Review Comment:
   It's internal. I am fine anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
++ " tasks to minimize cross rack traffic. Valid settings are : " 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware 
assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
++ ", which will compute minimum cross rack traffic assignment";

Review Comment:
   ```suggestion
   + ", which will compute minimum cross rack traffic 
assignment.";
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
   public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition 
into account when assigning"
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
++ " tasks to minimize cross rack traffic. Valid settings are : " 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware 
assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
++ ", which will compute minimum cross rack traffic assignment";
+
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
rack.aware.assignment.non_overlap_cost controls whether the "
++ "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
++ "optimize minimizing cross rack traffic";

Review Comment:
   ```suggestion
   + "optimize for minimizing cross rack traffic.";
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static 

[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-08-04 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15306:
---
Description: Integrate refreshCommittedOffsets logic, currently performed 
by the coordinator, into the update fetch positions performed on every 
iteration of the async consumer poll loop. This should rely on the 
CommitRequestManager to perform the request based on the refactored model, but 
it should reuse the logic for processing the committed offsets and updating the 
positions.   (was: Integrate refreshCommittedOffsets logic, currently performed 
by the coordinator, into the update fetch positions performed on every 
iteration of the consumer poll loop. )

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client, consumer, kip-945
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-08-04 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15306:
--

 Summary: Integrate committed offsets logic when updating fetching 
positions
 Key: KAFKA-15306
 URL: https://issues.apache.org/jira/browse/KAFKA-15306
 Project: Kafka
  Issue Type: Task
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Integrate refreshCommittedOffsets logic, currently performed by the 
coordinator, into the update fetch positions performed on every iteration of 
the consumer poll loop. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman commented on pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks

2023-08-04 Thread via GitHub


ableegoldman commented on PR #14149:
URL: https://github.com/apache/kafka/pull/14149#issuecomment-1666170754

   Test failures are unrelated
   
   Super small fix, would like to get it into 3.6 though
   
   ping also @vvcephei @showuon @wcarlson5 for a review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-08-04 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751214#comment-17751214
 ] 

Chris Egerton commented on KAFKA-15090:
---

This is a tricky one. First off, although we're almost certainly not going to 
move calls to {{SourceTask::stop}} back onto the herder tick thread, it's not 
unreasonable to try to restore some of the original behavior by spinning up a 
separate thread that can be used to call that method (so that we don't block on 
tasks returning from {{SourceTask::poll}} before being able to stop them).

However, this complicates the logic for task cleanup. Currently, tasks can 
deallocate all of their resources in {{{}SourceTask::stop{}}}, secure in the 
knowledge that the runtime will never poll them again or notify them of 
committed offsets. If we go back to potentially invoking {{SourceTask::stop}} 
and {{SourceTask::poll}} concurrently, then it becomes difficult for tasks to 
know when exactly they can deallocate resources. This is the motivation behind 
the now-abandoned 
[KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped].

 

One alternative is to keep most of the current behavior, but with stronger 
semantics for cancelling source tasks. This is described in KAFKA-14725; to 
summarize: we would continue invoking {{SourceTask::stop}} and 
{{SourceTask::poll}} on the same thread like we do today, but would start 
interrupting the poll-convert-produce thread when tasks exceed the graceful 
shutdown timeout.

Advantages of this alternative are that it preserves behavior that has been 
around in the Connect runtime for the last five releases, and makes it easier 
for developers to correctly implement resource deallocation logic in their 
source connectors. Disadvantages are that it does not make graceful task 
shutdown easier (increasing the likelihood of [ERROR-level log 
messages|https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1035]),
 and may not work for all connectors (interrupting a thread in Java is not 
guaranteed to actually interrupt some operations, even if they are blocking).

 

Finally, we could pursue something identical or similar to 
[KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped],
 by modifying the source task API to have separate methods for "triggering" a 
stop (which signals that the task should immediately return from any future or 
in-progress calls to {{{}SourceTask::poll{}}}) and "performing" a stop (wherein 
the task deallocates resources).

> Source tasks are no longer stopped on a separate thread
> ---
>
> Key: KAFKA-15090
> URL: https://issues.apache.org/jira/browse/KAFKA-15090
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
> 3.3.3, 3.6.0, 3.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
> {{SourceTask::stop}} method would be invoked on the herder tick thread, which 
> is a separate thread from the dedicated thread which was responsible for 
> polling data from the task and producing it to Kafka.
> This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
> {quote}The task will be stopped on a separate thread, and when that happens 
> this method is expected to unblock, quickly finish up any remaining 
> processing, and return.
> {quote}
> However, it came with the downside that the herder's tick thread would be 
> blocked until the invocation of {{SourceTask::stop}} completed, which could 
> result in major parts of the worker's REST API becoming unavailable and even 
> the worker falling out of the cluster.
> As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
> logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
> dedicated thread for the task (i.e., the one responsible for polling data 
> from it and producing that data to Kafka).
> This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} 
> and may have broken connectors that block during {{poll}} with the 
> expectation that {{stop}} can and will be invoked concurrently as a signal 
> that any ongoing polls should be interrupted immediately.
> Although reverting the fix is likely not a viable option (blocking the herder 
> thread on interactions with user-written plugins is high-risk and we have 
> tried to eliminate all instances of this where 

[jira] [Commented] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751212#comment-17751212
 ] 

Matthias J. Sax commented on KAFKA-15303:
-

Thanks for filing this ticket.

Kafka Streams as only limited support for schema evolution, because Kafka 
Streams is schema agnostic – the runtime does not even know what format is 
used, and thus cannot reason about it. I updated the ticket as "improvement" 
because it's not a bug: the system works as designed.

In the end, Kafka Streams uses whatever Serde you provide, so it's not clear if 
we even could fix it on our end? Maybe you could put some hack into the Serde 
you provide to fix it? It's unfortunately not possible atm to get the original 
raw bytes right now (that would allow so avoid the re-serialization to begin 
with).

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer 

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Issue Type: Improvement  (was: Bug)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Critical
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is 

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Priority: Major  (was: Critical)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is different.

[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-04 Thread via GitHub


gharris1727 commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1666104751

   @fvaleri That does appear to work, because [server-common is not being 
excluded from 
tools-dependant-libs](https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/build.gradle#L1895-L1897)
 like the clients is. I don't think this was intentional, I think it was 
accidentally left out when server-common was added as a dependency in #12469 . 
There is a compatibility boundary between server-common and clients in this 
case, but that was already there before your change.
   
   If we choose not to change this compatibility boundary from between 
server-common + clients to between tools + server-common, then I think the fix 
you propose is possible. I agree with your reasoning that server-common is a 
reasonable dependency for the connect-runtime, even if it's only present use is 
the ThroughputThrottler. I don't really mind whether it's in clients or 
server-common, as long as it's visible in the dependent modules.
   
   If someone later moves the compatibility boundary, and reduces the range of 
the version mixing (by changing the if condition to `if node.version < 
LATEST_1_1`) then moving this class around will have no effect anyway, so this 
fix doesn't make it any harder to add more fixes later.
   
   Can you revert the changes in producer_performance.py? they don't seem to 
have an effect anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-08-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15090:
--
Description: 
Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
dedicated thread for the task (i.e., the one responsible for polling data from 
it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.

  was:
Before [https://github.com/apache/kafka/pull/9669,] in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
dedicated thread for the task (i.e., the one responsible for polling data from 
it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.


> Source tasks are no longer stopped on a separate thread
> ---
>
> Key: KAFKA-15090
> URL: https://issues.apache.org/jira/browse/KAFKA-15090
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
> 3.3.3, 3.6.0, 3.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
> {{SourceTask::stop}} method would be invoked on the herder tick thread, which 
> is a separate thread from the dedicated thread which was responsible for 
> polling data from the task and producing it to Kafka.
> This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
> {quote}The task will be stopped on a separate thread, and when that happens 
> this method is expected to unblock, quickly finish up any remaining 
> processing, and return.
> {quote}
> However, it came with the downside that the herder's tick thread 

[jira] [Created] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2023-08-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15305:
--

 Summary: The background thread should try to process the remaining 
task until the shutdown timer is expired
 Key: KAFKA-15305
 URL: https://issues.apache.org/jira/browse/KAFKA-15305
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


While working on https://issues.apache.org/jira/browse/KAFKA-15304

close() API supplies a timeout parameter so that the consumer can have a grace 
period to process things before shutting down.  The background thread currently 
doesn't do that, when close() is initiated, it will immediately close all of 
its dependencies.

 

This might not be desirable because there could be remaining tasks to be 
processed before closing.  Maybe the correct things to do is to first stop 
accepting API request, second, let the runOnce() continue to run before the 
shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky

2023-08-04 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15020:
--
Component/s: unit tests

> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
>  test is flaky
> --
>
> Key: KAFKA-15020
> URL: https://issues.apache.org/jira/browse/KAFKA-15020
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Atul Sharma
>Priority: Major
>  Labels: flaky-test
>
> Sometimes the test fails with the following log:
> {code:java}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > 
> FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED
> org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
> instead of the expected 2 records
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
> at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215)
>  at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15101) Improve testRackAwareRangeAssignor test

2023-08-04 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15101:
--
Labels: flaky-test  (was: )

> Improve testRackAwareRangeAssignor test
> ---
>
> Key: KAFKA-15101
> URL: https://issues.apache.org/jira/browse/KAFKA-15101
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Priority: Major
>  Labels: flaky-test
>
> testRackAwareRangeAssignor has been really flaky recently. As a mitigation, 
> we have increased the timeouts to 30s in the test. This should already 
> improve it. However, it may be better to revise the entire test to avoid 
> those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15101) Improve testRackAwareRangeAssignor test

2023-08-04 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15101:
--
Component/s: unit tests

> Improve testRackAwareRangeAssignor test
> ---
>
> Key: KAFKA-15101
> URL: https://issues.apache.org/jira/browse/KAFKA-15101
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: David Jacot
>Priority: Major
>  Labels: flaky-test
>
> testRackAwareRangeAssignor has been really flaky recently. As a mitigation, 
> we have increased the timeouts to 30s in the test. This should already 
> improve it. However, it may be better to revise the entire test to avoid 
> those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15101) Improve testRackAwareRangeAssignor test

2023-08-04 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751187#comment-17751187
 ] 

Kirk True commented on KAFKA-15101:
---

Looks like a duplicate of KAFKA-15020.

> Improve testRackAwareRangeAssignor test
> ---
>
> Key: KAFKA-15101
> URL: https://issues.apache.org/jira/browse/KAFKA-15101
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Priority: Major
>
> testRackAwareRangeAssignor has been really flaky recently. As a mitigation, 
> we have increased the timeouts to 30s in the test. This should already 
> improve it. However, it may be better to revise the entire test to avoid 
> those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing

2023-08-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15304:
--

 Summary: CompletableApplicationEvents aren't being completed when 
the consumer is closing
 Key: KAFKA-15304
 URL: https://issues.apache.org/jira/browse/KAFKA-15304
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


If the background thread is closed before ingesting all ApplicationEvents, we 
should drain the background queue and try to cancel these events before 
closing. We can try to process these events before closing down the consumer; 
however, we assume that when the user issues a close command, the consumer 
should be shut down promptly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-04 Thread via GitHub


ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1665982738

   @fvaleri I summon thee. Please, take one more look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-08-04 Thread via GitHub


lianetm commented on code in PR #14123:
URL: https://github.com/apache/kafka/pull/14123#discussion_r1284689796


##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -35,8 +39,12 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
 consumer.commitAsync(cb)
 waitUntilTrue(() => {
   cb.successCount == 1
-}, "wait until commit is completed successfully", 5000)
+}, "wait until commit is completed successfully", 
defaultBlockingAPITimeoutMs)
+val committedOffset = consumer.committed(Set(tp).asJava, 
Duration.ofMillis(defaultBlockingAPITimeoutMs))
+
 assertTrue(consumer.assignment.contains(tp))
+assertNotNull(committedOffset)
+assertNull(committedOffset.get(tp))

Review Comment:
   Since this test is for no-consumption then the `sendRecords` could be 
removed in both tests I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14682:
--
Description: 
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter 
instead of the former, which can cause tests to fail due to unnecessary 
stubbings when being run in that IDE but not when being built on Jenkins.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.

 

This exact behavior has been reported elsewhere as a [Gradle 
issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
that 
thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], 
it appears this is a known and somewhat-intentional limitation of Mockito:
{quote}I spent some time trying to solve this and eventually I stumbled upon 
this piece in Mockito's JUnit runner:

[https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]


// only report when:
// 1. if all tests from given test have ran (filter requested is false)
// Otherwise we would report unnecessary stubs even if the user runs just 
single test
// from the class
// 2. tests are successful (we don't want to add an extra failure on top of any 
existing
// failure, to avoid confusion)
 
(1) suggests that skipping unused stub validation is the intended behavior when 
the user filters a single test from the class. However, this behavior applies 
to any type of filter.

And Gradle indeed applies a {{CategoryFilter}} if categories are configured: 
[https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96]

Which then causes Mockito to not validate unused stubs.
{quote}

  was:
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter 
instead of the former, which can cause tests to fail due to unnecessary 
stubbings when being run in that IDE but not when being built on Jenkins.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.

 

This exact behavior has been reported elsewhere as a [Gradle 
issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
that 
thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], 
it appears this is a known and somewhat-intentional limitation of Mockito:
{quote}I spent some time trying to solve this and eventually I stumbled upon 
this piece in Mockito's JUnit runner:

[https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]
{{ }}
{{}}
{{   // only report when:
// 1. if all tests from given test have ran (filter requested is 
false)
//   Otherwise we would report unnecessary stubs even if the user 
runs just single test
// from the class
// 2. tests are successful (we 

[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-04 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751178#comment-17751178
 ] 

Chris Egerton commented on KAFKA-14682:
---

I've taken a stab at an upstream Mockito fix. If it's accepted, we can either 
bump to a new version with that fix, or look into tweaking our Jenkinsfile to 
run the {{test}} task instead of {{{}unitTest integrationTest{}}}.

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.
>  
> This exact behavior has been reported elsewhere as a [Gradle 
> issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
> that 
> thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274],
>  it appears this is a known and somewhat-intentional limitation of Mockito:
> {quote}I spent some time trying to solve this and eventually I stumbled upon 
> this piece in Mockito's JUnit runner:
> [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]
> {{ }}
> {{}}
> {{   // only report when:
> // 1. if all tests from given test have ran (filter requested is 
> false)
> //   Otherwise we would report unnecessary stubs even if the user 
> runs just single test
> // from the class
> // 2. tests are successful (we don't want to add an extra failure 
> on top of any existing
> // failure, to avoid confusion)}}
>  
> (1) suggests that skipping unused stub validation is the intended behavior 
> when the user filters a single test from the class. However, this behavior 
> applies to any type of filter.
> And Gradle indeed applies a {{CategoryFilter}} if categories are configured: 
> [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96]
> Which then causes Mockito to not validate unused stubs.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14682:
-

Assignee: Chris Egerton  (was: Christo Lolov)

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.
>  
> This exact behavior has been reported elsewhere as a [Gradle 
> issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
> that 
> thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274],
>  it appears this is a known and somewhat-intentional limitation of Mockito:
> {quote}I spent some time trying to solve this and eventually I stumbled upon 
> this piece in Mockito's JUnit runner:
> [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]
> {{ }}
> {{}}
> {{   // only report when:
> // 1. if all tests from given test have ran (filter requested is 
> false)
> //   Otherwise we would report unnecessary stubs even if the user 
> runs just single test
> // from the class
> // 2. tests are successful (we don't want to add an extra failure 
> on top of any existing
> // failure, to avoid confusion)}}
>  
> (1) suggests that skipping unused stub validation is the intended behavior 
> when the user filters a single test from the class. However, this behavior 
> applies to any type of filter.
> And Gradle indeed applies a {{CategoryFilter}} if categories are configured: 
> [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96]
> Which then causes Mockito to not validate unused stubs.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15298) Disable DeleteRecords on Tiered Storage topics

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751172#comment-17751172
 ] 

Christo Lolov commented on KAFKA-15298:
---

Heya [~ckamal]! Thanks for pointing me to the pull request. I had not reviewed 
it so I wasn't aware of this particular change 
(https://github.com/apache/kafka/pull/13561/files#diff-10e27a71dc3dec3463df7752ab07f6227cf70bcee70a93a86b5984e020beae05L984)
 which I believe addresses my concern. Give me some time to review the pull 
request and I will close this Jira ticket if there is no need for it anymore.

> Disable DeleteRecords on Tiered Storage topics
> --
>
> Key: KAFKA-15298
> URL: https://issues.apache.org/jira/browse/KAFKA-15298
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> Currently the DeleteRecords API does not work with Tiered Storage. We should 
> ensure that this is reflected in the responses that clients get when trying 
> to use the API with tiered topics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee opened a new pull request, #14154: [TEST] Testing build failures for the commit

2023-08-04 Thread via GitHub


philipnee opened a new pull request, #14154:
URL: https://github.com/apache/kafka/pull/14154

   There's a lot of random build failure, and we aren't sure why.  Starting a 
fresh build to see if these build failures persist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-7438: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-04 Thread via GitHub


bachmanity1 commented on PR #14153:
URL: https://github.com/apache/kafka/pull/14153#issuecomment-1665858277

   @C0urante @mjsax @cadonna could you please review this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 opened a new pull request, #14153: KAFKA-7438: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-04 Thread via GitHub


bachmanity1 opened a new pull request, #14153:
URL: https://github.com/apache/kafka/pull/14153

   Replaced Easymock & Powermock with Mockito in KafkaBasedLogTest.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0

2023-08-04 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746622#comment-17746622
 ] 

Federico Valeri edited comment on KAFKA-15239 at 8/4/23 3:58 PM:
-

Hi [~showuon], thanks for raising this.

QuotaTest uses ProducerPerformanceService which is where the issue is located. 
This is the test instance that is failing.

{code}
[INFO:2023-07-24 05:00:40,057]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 
'cls_name': 'QuotaTest', 'method_name': 'test_quota', 'injected_args': 
{'quota_type': 'client-id', 'old_client_throttling_behavior': True}}
...
Exception: No output from ProducerPerformance
{code}

If we look at ProducerPerformanceService stderr logs in ./results, we have the 
reported exception.

{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/utils/ThroughputThrottler
at 
org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
at 
org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.utils.ThroughputThrottler
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 2 more
{code}

There are other system tests that use ProducerPerformanceService, and are 
failing for the same reason.

{code}
TC_PATHS="tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota" 
_DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1  --parameters 
'{\"quota_type\":\"client-id\",\"old_client_throttling_behavior\":true}'" bash 
tests/docker/run_tests.sh
TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version"
 _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters 
'{\"version\":\"0.8.2.2\",\"new_consumer\":false}'" bash 
tests/docker/run_tests.sh
TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version"
 _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters 
'{\"version\":\"0.9.0.1\"}'" bash tests/docker/run_tests.sh
TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version"
 _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters 
'{\"version\":\"0.9.0.1\",\"new_consumer\":false}'" bash 
tests/docker/run_tests.sh
TC_PATHS="tests/kafkatest/sanity_checks/test_performance_services.py::PerformanceServiceTest.test_version"
 _DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1 --parameters 
'{\"version\":\"1.1.1\",\"new_consumer\":false}'" bash tests/docker/run_tests.sh
{code}

The following commit broke all of them. QuotaTest was working with 3.5.0 
revision and no more after cherry picking this commit. This is only on trunk, 
so there is no need to mention anywhere.

https://github.com/apache/kafka/commit/125dbb92867c11739afc54c5e546f551f70d7113

Working on a fix.







was (Author: fvaleri):
Hi [~showuon], thanks for raising this.

QuotaTest uses ProducerPerformanceService which is where the issue is located. 
This is the test instance that is failing.

{code}
[INFO:2023-07-24 05:00:40,057]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 
'cls_name': 'QuotaTest', 'method_name': 'test_quota', 'injected_args': 
{'quota_type': 'client-id', 'old_client_throttling_behavior': True}}
...
Exception: No output from ProducerPerformance
{code}

If we look at ProducerPerformanceService stderr logs in ./results, we have the 
reported exception.

{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/utils/ThroughputThrottler
at 
org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
at 
org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.utils.ThroughputThrottler
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 2 more
{code}

There are other system test instances that use ProducerPerformanceService and 
are failing for the same reason. This is the full list.

{code}
TC_PATHS="tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota" 
_DUCKTAPE_OPTIONS="--max-parallel=1 --deflake=1  --parameters 
'{\"quota_type\":\"client-id\",\"old_client_throttling_behavior\":true}'" bash 
tests/docker/run_tests.sh

[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-04 Thread via GitHub


fvaleri commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1665804677

   @gharris1727 I tried various things, as I'm also learning how this works. 
   
   The best solution I found is to move the ThroughputThrottler class from 
clients to server-common, adding this extra implementation dependency to 
connect:runtime (last commit on this branch). 
   
   Why server-common? AFAIU, server-common hosts all classes shared by 
different modules, tools and connect:runtime in this case. Unlike tools, I 
think it makes sense to have this dependency for connect:runtime, as it may be 
used for other classes in the future.
   
   That way, we won't move the compatibility boundary. System tests will 
continue to use ProducerPerformance from dev branch as before, without bringing 
in dev clients, but using the version under test. Let me know if you see any 
issue with this approach. Thanks.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14118: KAFKA-14875: Implement wakeup

2023-08-04 Thread via GitHub


philipnee commented on PR #14118:
URL: https://github.com/apache/kafka/pull/14118#issuecomment-1665733961

   I've been seeing stream test failures in the recent commits too!
   
   This PR needs to be rebased against this PR: 
https://github.com/apache/kafka/pull/14123 to resolve some of the failing 
committed test.. sorry about that @kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14682:
--
Description: 
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter 
instead of the former, which can cause tests to fail due to unnecessary 
stubbings when being run in that IDE but not when being built on Jenkins.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.

 

This exact behavior has been reported elsewhere as a [Gradle 
issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
that 
thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274], 
it appears this is a known and somewhat-intentional limitation of Mockito:
{quote}I spent some time trying to solve this and eventually I stumbled upon 
this piece in Mockito's JUnit runner:

[https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]
{{ }}
{{}}
{{   // only report when:
// 1. if all tests from given test have ran (filter requested is 
false)
//   Otherwise we would report unnecessary stubs even if the user 
runs just single test
// from the class
// 2. tests are successful (we don't want to add an extra failure 
on top of any existing
// failure, to avoid confusion)}}
 
(1) suggests that skipping unused stub validation is the intended behavior when 
the user filters a single test from the class. However, this behavior applies 
to any type of filter.

And Gradle indeed applies a {{CategoryFilter}} if categories are configured: 
[https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96]

Which then causes Mockito to not validate unused stubs.
{quote}

  was:
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter 
instead of the former, which can cause tests to fail due to unnecessary 
stubbings when being run in that IDE but not when being built on Jenkins.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.


> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Christo Lolov
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are 

[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-04 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751125#comment-17751125
 ] 

Chris Egerton commented on KAFKA-14682:
---

[~christo_lolov] were you still planning on working on this? If not, I can take 
over.

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Christo Lolov
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd merged pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


satishd merged PR #13984:
URL: https://github.com/apache/kafka/pull/13984


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


satishd commented on PR #13984:
URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665559532

   Right, there are a few unrelated test failures, merging it to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Charles-Eddy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Charles-Eddy updated KAFKA-15303:
-
Attachment: image (1).png

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Critical
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is different.
>  
> *Potential 

[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751099#comment-17751099
 ] 

Christo Lolov commented on KAFKA-15267:
---

Okay, I will send out an email later today

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a fair ask. Should we wish to revise this later it 
> should be possible.
> h2. Option 5: Make Kafka forget about all remote information
> Pros:
>  * Clean cut
> Cons:

[GitHub] [kafka] ivanyu commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


ivanyu commented on PR #13984:
URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665493163

   Thank you @satishd 
   Seems the failures aren't related


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751095#comment-17751095
 ] 

Luke Chen commented on KAFKA-15267:
---

(1) and (3) are good to me. The only concern is we don't have KIP for adopting 
4.1. Could we at least start a discussion thread in dev mailing list to see if 
there is any concern from the community?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a fair ask. 

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284280124


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -667,11 +675,323 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284278935


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2207,7 +2267,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends 
SegmentDeletionReason {
 var size = log.size
 toDelete.foreach { segment =>
   size -= segment.size
-  log.info(s"Deleting segment $segment due to retention size 
${log.config.retentionSize} breach. Log size " +
+  log.info(s"Deleting segment $segment due to local log retention size 
${UnifiedLog.localRetentionSize(log.config)} breach. Local log size " +

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751078#comment-17751078
 ] 

Christo Lolov commented on KAFKA-15195:
---

Acknowledged, I will circle-back on this once 3.6.0 makes it out!

> Regenerate segment-aligned producer snapshots when upgrading to a Kafka 
> version supporting Tiered Storage
> -
>
> Key: KAFKA-15195
> URL: https://issues.apache.org/jira/browse/KAFKA-15195
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
> Fix For: 3.6.0
>
>
> As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to 
> upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will 
> have to wait for retention to clean up segments without an associated 
> producer snapshot.
> However, in our experience, customers of Kafka expect to be able to 
> immediately enable tiering on a topic once their cluster upgrade is complete. 
> Once they do this, however, they start seeing NPEs and no data is uploaded to 
> Tiered Storage 
> (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).
> To achieve this, we propose changing Kafka to retroactively create producer 
> snapshot files on upload whenever a segment is due to be archived and lacks 
> one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751077#comment-17751077
 ] 

Christo Lolov commented on KAFKA-15267:
---

Heya [~showuon]. Okay, I propose we do the following:
 * Mark this as a blocker for 3.6.0
 * I take on implementing 4.1 as a solution to unblocking 3.6.0 without a KIP
 * I take a follow-up task on proposing a KIP for 4.2 once KIP-950 makes it in 
+ implementing it

Thoughts?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires 

[jira] [Comment Edited] (KAFKA-14038) Optimize calculation of size for log in remote tier

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751076#comment-17751076
 ] 

Christo Lolov edited comment on KAFKA-14038 at 8/4/23 10:37 AM:


The remaining part of this ticket is blocked on merging 
https://github.com/apache/kafka/pull/13561


was (Author: christo_lolov):
The remaining part of this ticket is blocked on merging 
https://issues.apache.org/jira/browse/KAFKA-14038

> Optimize calculation of size for log in remote tier
> ---
>
> Key: KAFKA-14038
> URL: https://issues.apache.org/jira/browse/KAFKA-14038
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Major
>  Labels: KIP-405, needs-kip
> Fix For: 3.6.0
>
>
> {color:#24292f}As per the Tiered Storage feature introduced in 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage],
>  users can configure the retention of remote tier based on time, by size, or 
> both. The work of computing the log segments to be deleted based on the 
> retention config is [owned by 
> RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool]
>  (RLM).{color}
> {color:#24292f}To compute remote segments eligible for deletion based on 
> retention by size config, {color}RLM needs to compute the 
> {{total_remote_log_size}} i.e. the total size of logs available in the remote 
> tier for that topic-partition. RLM could use the 
> {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for 
> all the remote segments and then aggregate the segment sizes by using 
> {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log 
> size stored in the remote tier.
> The above method involves iterating through all metadata of all the segments 
> i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM 
> thread. {color:#24292f}Since the main feature of tiered storage is storing a 
> large amount of data, we expect num_remote_segments to be large and a 
> frequent linear scan could be expensive (depending on the underlying storage 
> used by RemoteLogMetadataManager).
> Segment offloads and segment deletions are run together in the same task and 
> a fixed size thread pool is shared among all topic-partitions. A slow logic 
> for calculation of total_log_size could result in the loss of availability as 
> demonstrated in the following scenario:{color}
>  # {color:#24292f}Calculation of total_size is slow and the threads in the 
> thread pool are busy with segment deletions{color}
>  # Segment offloads are delayed (since they run together with deletions)
>  # Local disk fills up, since local deletion requires the segment to be 
> offloaded
>  # If local disk is completely full, Kafka fails
> Details are in KIP - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14038) Optimize calculation of size for log in remote tier

2023-08-04 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov reassigned KAFKA-14038:
-

Assignee: Christo Lolov  (was: Divij Vaidya)

> Optimize calculation of size for log in remote tier
> ---
>
> Key: KAFKA-14038
> URL: https://issues.apache.org/jira/browse/KAFKA-14038
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Major
>  Labels: KIP-405, needs-kip
> Fix For: 3.6.0
>
>
> {color:#24292f}As per the Tiered Storage feature introduced in 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage],
>  users can configure the retention of remote tier based on time, by size, or 
> both. The work of computing the log segments to be deleted based on the 
> retention config is [owned by 
> RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool]
>  (RLM).{color}
> {color:#24292f}To compute remote segments eligible for deletion based on 
> retention by size config, {color}RLM needs to compute the 
> {{total_remote_log_size}} i.e. the total size of logs available in the remote 
> tier for that topic-partition. RLM could use the 
> {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for 
> all the remote segments and then aggregate the segment sizes by using 
> {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log 
> size stored in the remote tier.
> The above method involves iterating through all metadata of all the segments 
> i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM 
> thread. {color:#24292f}Since the main feature of tiered storage is storing a 
> large amount of data, we expect num_remote_segments to be large and a 
> frequent linear scan could be expensive (depending on the underlying storage 
> used by RemoteLogMetadataManager).
> Segment offloads and segment deletions are run together in the same task and 
> a fixed size thread pool is shared among all topic-partitions. A slow logic 
> for calculation of total_log_size could result in the loss of availability as 
> demonstrated in the following scenario:{color}
>  # {color:#24292f}Calculation of total_size is slow and the threads in the 
> thread pool are busy with segment deletions{color}
>  # Segment offloads are delayed (since they run together with deletions)
>  # Local disk fills up, since local deletion requires the segment to be 
> offloaded
>  # If local disk is completely full, Kafka fails
> Details are in KIP - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14038) Optimize calculation of size for log in remote tier

2023-08-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751076#comment-17751076
 ] 

Christo Lolov commented on KAFKA-14038:
---

The remaining part of this ticket is blocked on merging 
https://issues.apache.org/jira/browse/KAFKA-14038

> Optimize calculation of size for log in remote tier
> ---
>
> Key: KAFKA-14038
> URL: https://issues.apache.org/jira/browse/KAFKA-14038
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: KIP-405, needs-kip
> Fix For: 3.6.0
>
>
> {color:#24292f}As per the Tiered Storage feature introduced in 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage],
>  users can configure the retention of remote tier based on time, by size, or 
> both. The work of computing the log segments to be deleted based on the 
> retention config is [owned by 
> RemoteLogManager|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-1.RemoteLogManager(RLM)ThreadPool]
>  (RLM).{color}
> {color:#24292f}To compute remote segments eligible for deletion based on 
> retention by size config, {color}RLM needs to compute the 
> {{total_remote_log_size}} i.e. the total size of logs available in the remote 
> tier for that topic-partition. RLM could use the 
> {{RemoteLogMetadataManager.listRemoteLogSegments()}} to fetch metadata for 
> all the remote segments and then aggregate the segment sizes by using 
> {{{}RemoteLogSegmentMetadata.segmentSizeInBytes(){}}}to find the total log 
> size stored in the remote tier.
> The above method involves iterating through all metadata of all the segments 
> i.e. O({color:#24292f}num_remote_segments{color}) on each execution of RLM 
> thread. {color:#24292f}Since the main feature of tiered storage is storing a 
> large amount of data, we expect num_remote_segments to be large and a 
> frequent linear scan could be expensive (depending on the underlying storage 
> used by RemoteLogMetadataManager).
> Segment offloads and segment deletions are run together in the same task and 
> a fixed size thread pool is shared among all topic-partitions. A slow logic 
> for calculation of total_log_size could result in the loss of availability as 
> demonstrated in the following scenario:{color}
>  # {color:#24292f}Calculation of total_size is slow and the threads in the 
> thread pool are busy with segment deletions{color}
>  # Segment offloads are delayed (since they run together with deletions)
>  # Local disk fills up, since local deletion requires the segment to be 
> offloaded
>  # If local disk is completely full, Kafka fails
> Details are in KIP - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-04 Thread via GitHub


fvaleri commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1665393332

   > So for example, if the node.version is 1.0.0, this condition will be true, 
and the script will add DEV jars to CLASSPATH. kafka-run-class.sh then finds 
the 1.0.0 jars and adds them to CLASSPATH.
   
   This shows my lack of knowledge on Kafka STs. I see it now. Thanks.
   
   > In https://github.com/apache/kafka/pull/13313 I explored eliminating the 
artifact mixing, but it turned out that the 0.8.x tools was missing 
functionality, and so I had to inject the 0.9.x tools jar. I think we can do 
something similar here, try to remove the artifact mixing, and if that fails, 
find an intermediate version that can be injected instead of trunk.
   
   Yeah, I'll try to do that and give an update.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Charles-Eddy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Charles-Eddy updated KAFKA-15303:
-
Description: 
Hello everyone, I am currently working on a project that uses Kafka Streams 
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our sellers with additional data 
from various input topics, and then feed the resulting joined information into 
an output topic.

Our application is deployed in Kubernetes using the StatefulSet feature, with 
one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.

We are using avro to serialize / deserialize input topics and storing in the 
state stores of Kafka streams.

We have encountered a bug in Kafka Streams that prevents us from deploying new 
versions of Kafka Streams containing new compatible Avro schemas of our input 
topics.

The symptom is that after deploying our new version, which contains no changes 
in topology but only changes to the Avro schema used, we discard every event 
coming from the right part of the join concerned by these Avro schema changes 
until we receive something from the left part of the join.

As a result, we are losing events and corrupting our output topics and stores 
with outdated information.

After checking the local help for the priority to assign, I have assigned it as 
*CRITICAL* because we are losing data (for example, tombstones are not 
propagated to the following joins, so some products are still visible on our 
website when they should not be).

Please feel free to change the priority if you think it is not appropriate.

 

*The bug:*
After multiple hours of investigation we found out that the bug is located in 
the foreign key join feature and specifically in this class: 
*SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
join. 
This class and his method process(...) is computing a hash from the local store 
via a serialization of a deserialized value from the left state store and 
comparing it with the hash of the original message from the 
subscription-response-topic. 
 
It means that when we deploy a new version of our kafka streams instance with a 
new compatible avro schema from the left side of a join, every join triggered 
by the right part of the join are invalidated until we receive all the events 
again on the left side. Every join triggered by the right part of the join are 
discarded because all the hashes computed by kafka streams are different now 
from the original messages.
 
*How to reproduce it:*
If we take a working and a non-working workflow, it will do something like this:
+Normal non-breaking+ workflow from the left part of the FK join:
 # A new offer event occurs. The offer is received and stored (v1).
 # A subscription registration is sent with the offer-hash (v1).
 # The subscription is saved to the store with the v1 offer-hash.
 # Product data is searched for.
 # If product data is found, a subscription response is sent back, including 
the v1 offer-hash.
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

New product event from the right part of the FK join:
 # The product is received and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

 
+Breaking workflow:+ 
The offer serializer is changed to offer v2
New product event from the right part of the FK join: 
 # The product is received and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v2) and response event (still v1) are compared.
 # The join result is not sent since the hash is different.

 

*Potential Fix:*
Currently, the offer’s hash is computed from the serialization of deserialized 
offer data. This means that binary offer data v1 is deserialized into a v2 data 
class using a v1-compatible deserializer, and then serialized into binary v2 
data.
As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
the response event).
Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
directly retrieving the bytes from the store to compute the hash?
This way, the hash would be the same.
 
Thanks.

  was:
Hello everyone, I am currently working on a project that uses Kafka Streams 
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our 

[GitHub] [kafka] hudeqi commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown

2023-08-04 Thread via GitHub


hudeqi commented on PR #13929:
URL: https://github.com/apache/kafka/pull/13929#issuecomment-1665378793

   > I still need to get 3.5.1 out and focus on KIP-405 PRs (targeting 3.6 
release) before I get some more time @hudeqi!
   
   Hi, do you have time to review this PR now? I want to combine all the 
currently opened PRs about metric leaks into one PR. I don’t know if this will 
help or burden your review? @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Charles-Eddy (Jira)
Charles-Eddy created KAFKA-15303:


 Summary: Foreign key joins no longer triggered by events on the 
right side of the join after deployment with a new compatible Avro schema
 Key: KAFKA-15303
 URL: https://issues.apache.org/jira/browse/KAFKA-15303
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.4.0
Reporter: Charles-Eddy


Hello everyone, I am currently working on a project that uses Kafka Streams 
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our sellers with additional data 
from various input topics, and then feed the resulting joined information into 
an output topic.

Our application is deployed in Kubernetes using the StatefulSet feature, with 
one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.

We are using avro to serialize / deserialize input topics and storing in the 
state stores of Kafka streams.

We have encountered a bug in Kafka Streams that prevents us from deploying new 
versions of Kafka Streams containing new compatible Avro schemas of our input 
topics.

The symptom is that after deploying our new version, which contains no changes 
in topology but only changes to the Avro schema used, we discard every event 
coming from the right part of the join concerned by these Avro schema changes 
until we receive something from the left part of the join.

As a result, we are losing events and corrupting our output topics and stores 
with outdated information.

After checking the local help for the priority to assign, I have assigned it as 
*CRITICAL* because we are losing data (for example, tombstones are not 
propagated to the following joins, so some products are still visible on our 
website when they should not be).

Please feel free to change the priority if you think it is not appropriate.

 

*The bug:*
After multiple hours of investigation we found out that the bug is located in 
the foreign key join feature and specifically in this class: 
*SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
join. 
This class and his method process(...) is computing a hash from the local store 
via a serialization of a deserialized value from the left state store and 
comparing it with the hash of the original message from the 
subscription-response-topic. 
 
It means that when we deploy a new version of our kafka streams instance with a 
new compatible avro schema from the left side of a join, every join triggered 
by the right part of the join are invalidated until we receive all the events 
again on the left side. Every join triggered by the right part of the join are 
discarded because all the hashes computed by kafka streams are different now 
from the original messages.
 
*How to reproduce it:*
If we take a working and a non-working workflow, it will do something like this:
+Normal non-breaking+ workflow from the left part of the FK join: # A new offer 
event occurs. The offer is received and stored (v1).
 # A subscription registration is sent with the offer-hash (v1).
 # The subscription is saved to the store with the v1 offer-hash.
 # Product data is searched for.
 # If product data is found, a subscription response is sent back, including 
the v1 offer-hash.
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

New product event from the right part of the FK join: # The product is received 
and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

 
+Breaking workflow:+ 
The offer serializer is changed to offer v2
New product event from the right part of the FK join: # The product is received 
and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v2) and response event (still v1) are compared.
 # The join result is not sent since the hash is different.

 

*Potential Fix:*
Currently, the offer’s hash is computed from the serialization of deserialized 
offer data. This means that binary offer data v1 is deserialized into a v2 data 
class using a v1-compatible deserializer, and then serialized into binary v2 
data.
As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
the response event).
Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
directly retrieving the bytes from the store to compute the 

[GitHub] [kafka] yashmayya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-08-04 Thread via GitHub


yashmayya commented on PR #14152:
URL: https://github.com/apache/kafka/pull/14152#issuecomment-1665377020

   @cadonna @mjsax would you be able to take a look at this whenever you get a 
chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-08-04 Thread via GitHub


yashmayya commented on code in PR #14152:
URL: https://github.com/apache/kafka/pull/14152#discussion_r1284245654


##
streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java:
##
@@ -164,17 +160,6 @@ public void shouldThrowNullPointerIfInnerIsNull() {
 assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
 }
 
-@Test
-public void shouldThrowNullPointerIfKeySerdeIsNull() {
-assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
-}
-
-@Test
-public void shouldThrowNullPointerIfValueSerdeIsNull() {
-assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(supplier, Serdes.String(),
-null, new MockTime()));
-}

Review Comment:
   These tests were buggy - there aren't currently any `null` checks in place 
for key / value serdes (or at least not in the builder). The only reason they 
were passing earlier is due to the use of `EasyMock` with ["nice" 
mocks](https://easymock.org/user-guide.html#mocking-nice). 
   
   In the `setUp` method, the expectation for the 
`WindowBytesStoreSupplier::name` method is setup - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L61
   
By default, this method stub covers only the first invocation of the method 
and subsequent invocations will result in a `null` value being returned 
(because the mock object is "nice"). The first invocation occurs in the `setUp` 
method itself when the builder is instantiated - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L65-L70
   
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java#L39
   
   The `null` check for name here - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java#L41
 
   is what resulted in the `NullPointerException` being thrown in the 
`shouldThrowNullPointerIfKeySerdeIsNull` and 
`shouldThrowNullPointerIfValueSerdeIsNull` tests.
   
   This test bug was discovered because method stubbings in `Mockito` are 
applied to any number of invocations of the method by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-08-04 Thread via GitHub


yashmayya commented on code in PR #14152:
URL: https://github.com/apache/kafka/pull/14152#discussion_r1284245654


##
streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java:
##
@@ -164,17 +160,6 @@ public void shouldThrowNullPointerIfInnerIsNull() {
 assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
 }
 
-@Test
-public void shouldThrowNullPointerIfKeySerdeIsNull() {
-assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
-}
-
-@Test
-public void shouldThrowNullPointerIfValueSerdeIsNull() {
-assertThrows(NullPointerException.class, () -> new 
WindowStoreBuilder<>(supplier, Serdes.String(),
-null, new MockTime()));
-}

Review Comment:
   These tests were buggy - there aren't currently any `null` checks in place 
for key / value serdes. The only reason they were passing earlier is due to the 
use of `EasyMock` with ["nice" 
mocks](https://easymock.org/user-guide.html#mocking-nice). 
   
   In the `setUp` method, the expectation for the 
`WindowBytesStoreSupplier::name` method is setup - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L61
   
By default, this method stub covers only the first invocation of the method 
and subsequent invocations will result in a `null` value being returned 
(because the mock object is "nice"). The first invocation occurs in the `setUp` 
method itself when the builder is instantiated - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java#L65-L70
   
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java#L39
   
   The `null` check for name here - 
https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java#L41
 
   is what resulted in the `NullPointerException` being thrown in the 
`shouldThrowNullPointerIfKeySerdeIsNull` and 
`shouldThrowNullPointerIfValueSerdeIsNull` tests.
   
   This test bug was discovered because method stubbings in `Mockito` are 
applied to any number of invocations of the method by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-04 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1284241897


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()
 maybeIncrementFirstUnstableOffset()
 initializeTopicId()
 
 
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+  s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
 logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   This is not required as the updated code does not use this method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-08-04 Thread via GitHub


yashmayya opened a new pull request, #14152:
URL: https://github.com/apache/kafka/pull/14152

   - Part of the larger effort to replace the usage of `EasyMock` and 
`PowerMock` with `Mockito` across the project that has been underway for the 
last few years (see https://issues.apache.org/jira/browse/KAFKA-7438)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-04 Thread via GitHub


showuon opened a new pull request, #14151:
URL: https://github.com/apache/kafka/pull/14151

   When configuring RLMM, the configs passed into `configure` method is the 
`RemoteLogManagerConfig`. But in `RemoteLogManagerConfig`, there's no configs 
related to `remote.log.metadata.*`, ex: 
`remote.log.metadata.topic.replication.factor`. So, even if users have set the 
config in broker, it'll never be applied.
   
   This PR fixed the issue to allow users passed in `remote.log.metadata.*` 
configs into RLMM, including 
`remote.log.metadata.common.client.`/`remote.log.metadata.producer.`/ 
`remote.log.metadata.consumer.` prefixes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


satishd commented on PR #13984:
URL: https://github.com/apache/kafka/pull/13984#issuecomment-1665334207

   Will merge if the Jenkins jobs do not fail with any related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-04 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15295:


Assignee: Luke Chen  (was: Kamal Chandraprakash)

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-04 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751054#comment-17751054
 ] 

Kamal Chandraprakash commented on KAFKA-15295:
--

[~showuon] 

I'm not working on this one currently. Thanks for taking it forward!

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-04 Thread Ashwin Pankaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Pankaj resolved KAFKA-14598.
---
Resolution: Fixed

Did not observe this recently

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ashwin Pankaj
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-04 Thread Ashwin Pankaj (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751051#comment-17751051
 ] 

Ashwin Pankaj commented on KAFKA-14598:
---

Did not see this occuring recently - closing this issue.

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ashwin Pankaj
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751036#comment-17751036
 ] 

Luke Chen commented on KAFKA-15083:
---

[~satish.duggana] , sorry that I didn't get the ping last time. 

The issue here is, when configuring RLMM, the configs passed into `configure` 
method is the `RemoteLogManagerConfig`. But in `RemoteLogManagerConfig`, 
there's no configs related to `{_}{color:#00}remote.log.metadata.*`, ex: 
`remote.log.metadata.topic.replication.factor`.{color}{_} {color:#00}So, 
even if users have set the config in broker, it'll never be applied. Working on 
a patch now.{color}

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751033#comment-17751033
 ] 

Luke Chen commented on KAFKA-15267:
---

Do we have any update on this issue? I think this is a blocker for v3.6.0, do 
you agree?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a fair ask. Should we wish to revise this later it 
> should be possible.
> h2. Option 5: Make Kafka forget about all remote 

[GitHub] [kafka] satishd commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


satishd commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284146317


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > this.customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, this.customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));

Review Comment:
   That looks reasonable to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751031#comment-17751031
 ] 

Luke Chen commented on KAFKA-15295:
---

[~ckamal] , are you still working on this? If you don't have time, I can work 
on it. Please let me know. Thanks.

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284145024


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java:
##
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
  */
 private final int segmentSizeInBytes;
 
+/**
+ * Custom metadata.

Review Comment:
   Thanks, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


showuon commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284105567


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig {
 public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
 "needed by RemoteLogMetadataManager implementation.";
 
+public static final String 
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP = 
"remote.log.metadata.custom.metadata.max.size";

Review Comment:
   No concern. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #14145: KAFKA-10199: Change to RUNNING if no pending task to recycle exist

2023-08-04 Thread via GitHub


cadonna merged PR #14145:
URL: https://github.com/apache/kafka/pull/14145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #14145: KAFKA-10199: Change to RUNNING if no pending task to recycle exist

2023-08-04 Thread via GitHub


cadonna commented on PR #14145:
URL: https://github.com/apache/kafka/pull/14145#issuecomment-1665114431

   Build failures are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetSyncsTopicsOnTarget()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.runtime.rest.ConnectRestServerTest.testCORSDisabled
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-04 Thread via GitHub


showuon commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1284009383


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset and 

[GitHub] [kafka] lihaosky opened a new pull request, #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


lihaosky opened a new pull request, #14150:
URL: https://github.com/apache/kafka/pull/14150

   ### Description
   1. Add configs for rack aware assignor
   2. Update standby optimizer in `RackAwareTaskAssignor` to have more rounds
   3. Refactor some method in `RackAwareTaskAssignorTest` to test utils so that 
they can also be used in `HighAvailabilityTaskAssignorTest` and other tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


satishd commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284030338


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java:
##
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
  */
 private final int segmentSizeInBytes;
 
+/**
+ * Custom metadata.

Review Comment:
   I think having that in `CustomMetadata` makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org