[GitHub] [kafka] abhijeetk88 opened a new pull request, #13944: [DRAFT PR - WIP] KAFKA-14953: Adding RemoteLogManager metrics

2023-06-30 Thread via GitHub


abhijeetk88 opened a new pull request, #13944:
URL: https://github.com/apache/kafka/pull/13944

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on pull request #13943: [DRAFT] KAFKA-14953 - Added metrics related to tiered storage.

2023-06-30 Thread via GitHub


abhijeetk88 commented on PR #13943:
URL: https://github.com/apache/kafka/pull/13943#issuecomment-1615504449

   Will raise a new one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 closed pull request #13943: [DRAFT] KAFKA-14953 - Added metrics related to tiered storage.

2023-06-30 Thread via GitHub


abhijeetk88 closed pull request #13943: [DRAFT] KAFKA-14953 - Added metrics 
related to tiered storage.
URL: https://github.com/apache/kafka/pull/13943


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 opened a new pull request, #13943: [DRAFT PR] Kafka 14953 - WIP

2023-06-30 Thread via GitHub


abhijeetk88 opened a new pull request, #13943:
URL: https://github.com/apache/kafka/pull/13943

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12844: KAFKA-14353: Allow configuring request timeouts for create/update/validate Kafka Connect REST endpoints

2023-06-30 Thread via GitHub


github-actions[bot] commented on PR #12844:
URL: https://github.com/apache/kafka/pull/12844#issuecomment-1615427934

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12849: MINOR: Fix commitId maybe null

2023-06-30 Thread via GitHub


github-actions[bot] commented on PR #12849:
URL: https://github.com/apache/kafka/pull/12849#issuecomment-1615427881

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 opened a new pull request, #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-06-30 Thread via GitHub


wcarlson5 opened a new pull request, #13942:
URL: https://github.com/apache/kafka/pull/13942

   Check the history retention of the ktable of the grace period join. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15138) Java kafka-clients compression dependencies should be optional

2023-06-30 Thread Joe DiPol (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe DiPol updated KAFKA-15138:
--
Description: 
If you look at

[https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]

You see that the dependencies for the compression libraries (like lz4-java) do 
NOT have "{{{}true{}}}". That means that these libraries 
are transitive dependencies which will be pulled (and potentially security 
scanned) for any project that uses kafka-clients. 

This is not correct. These compression libraries are optional and should not be 
transitive dependencies of kafka-clients. Therefore the above pom should state 
{{optional}} like:

{{

org.lz4
lz4-java
1.8.0
runtime
true

}}

  was:
If you look at

[https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]

You see that the dependencies for the compression libraries (like lz4-java) do 
NOT have "{{{}true{}}}". That means that these libraries 
are transitive dependencies which will be pulled (and potentially security 
scanned) for any project that uses kafka-clients. 

This is not correct. These compression libraries are optional and should not be 
transitive dependencies of kafka-clients. Therefore the above pom should state 
{{optional}} like:


    org.lz4
    lz4-java
    1.8.0
    runtime
    true



> Java kafka-clients compression dependencies should be optional
> --
>
> Key: KAFKA-15138
> URL: https://issues.apache.org/jira/browse/KAFKA-15138
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Joe DiPol
>Priority: Major
>
> If you look at
> [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]
> You see that the dependencies for the compression libraries (like lz4-java) 
> do NOT have "{{{}true{}}}". That means that these 
> libraries are transitive dependencies which will be pulled (and potentially 
> security scanned) for any project that uses kafka-clients. 
> This is not correct. These compression libraries are optional and should not 
> be transitive dependencies of kafka-clients. Therefore the above pom should 
> state {{optional}} like:
> {{
> 
> org.lz4
> lz4-java
> 1.8.0
> runtime
> true
> 
> }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15138) Java kafka-clients compression dependencies should be optional

2023-06-30 Thread Joe DiPol (Jira)
Joe DiPol created KAFKA-15138:
-

 Summary: Java kafka-clients compression dependencies should be 
optional
 Key: KAFKA-15138
 URL: https://issues.apache.org/jira/browse/KAFKA-15138
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: Joe DiPol


If you look at

[https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]

You see that the dependencies for the compression libraries (like lz4-java) do 
NOT have "{{{}true{}}}". That means that these libraries 
are transitive dependencies which will be pulled (and potentially security 
scanned) for any project that uses kafka-clients. 

This is not correct. These compression libraries are optional and should not be 
transitive dependencies of kafka-clients. Therefore the above pom should state 
{{optional}} like:


    org.lz4
    lz4-java
    1.8.0
    runtime
    true




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14335) Admin.listConsumerGroups should allow filtering, pagination

2023-06-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14335:
--
Labels: needs-kip  (was: )

> Admin.listConsumerGroups should allow filtering, pagination
> ---
>
> Key: KAFKA-14335
> URL: https://issues.apache.org/jira/browse/KAFKA-14335
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, protocol
>Affects Versions: 3.3.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: needs-kip
>
> The 
> [Admin|https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/Admin.html]
>  API provides a means for clients to list the consumer groups in the cluster. 
> When the list of consumer groups becomes very large, it can cause problems 
> for the client (e.g., OOM errors) as well as overhead for the broker and 
> network.
> The proposal is to enhance the 
> [ListConsumerGroupsOptions|https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.html)]
>  class to have optional values such as:
>  * Consumer group ID regex (evaluated on broker)
>  * Pagination token (consumer group ID, probably)
> This will require a KIP since it is enhancing the admin API, protocol, and 
> broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] riedelmax opened a new pull request, #13941: KAFKA-15123: Add tests for ChunkedBytesStream

2023-06-30 Thread via GitHub


riedelmax opened a new pull request, #13941:
URL: https://github.com/apache/kafka/pull/13941

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248282166


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   Hmm -- reset meaning we should refresh? I guess my point was that if we 
lower the epoch we may delay the reset. I guess worst case, we just have the 
wait the refresh interval though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248223476


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   I think so. The epoch value is not so important here. The important part is 
that it should be reset regardless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13935: MINOR: Fix debug logs to display TimeIndexOffset

2023-06-30 Thread via GitHub


divijvaidya commented on PR #13935:
URL: https://github.com/apache/kafka/pull/13935#issuecomment-1615046002

   Unrelated test failures
   ```
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationWithEmptyPartition()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testReplicationWithEmptyPartition__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testSyncTopicConfigs__/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeAtMinIsrPartitions_String__quorum_kraft/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testSubscribeWhenTopicUnavailable__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
   [Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_11_and_Scala_2_131__Type_ZK__Name_testDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   ```
   
   @showuon requesting review for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248166033


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -40,16 +47,16 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 
 @Tag("integration")
 public class DedicatedMirrorIntegrationTest {
 
 private static final Logger log = 
LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
-
-private static final int TOPIC_CREATION_TIMEOUT_MS = 120_000;

Review Comment:
   Reverted back to original since we have a better logic to ensure correct 
sequence of startup now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164637


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, 
String topic, int numMes
 }
 }
 
+private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) 
throws InterruptedException {
+waitForCondition(() -> {
+try {
+return mirrorMakers.values().stream().allMatch(
+mm -> CONNECTOR_CLASSES.stream().allMatch(
+connectorClazz -> 
isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)));
+} catch (Exception ex) {
+log.error("Something unexpected occurred. Unable to check for 
startup status for mirror maker for {}", sourceAndTarget, ex);
+throw ex;
+}
+}, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition 
to running in time");
+}
+
+private  void awaitConnectorTasksStart(final 
Class clazz, final String source, String target) throws InterruptedException 
{
+waitForCondition(() -> {
+try {
+return mirrorMakers.values().stream().allMatch(mm -> 
isTaskRunningForMirrorMakerConnector(clazz, mm, source, target));

Review Comment:
   Done. Only checking for one node now.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, 
String topic, int numMes
 }
 }
 
+private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) 
throws InterruptedException {
+waitForCondition(() -> {
+try {
+return mirrorMakers.values().stream().allMatch(

Review Comment:
   Done in latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164007


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster 
cluster, String clusterName,
 }
 }
 
+/**
+ * Validates that the underlying connector are running for the given 
MirrorMaker.
+ */
+private boolean isConnectorRunningForMirrorMaker(final Class 
connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+final String connName = connectorClazz.getSimpleName();
+final ConnectorStateInfo connectorStatus = 
mm.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector state is set to running
+&& 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());

Review Comment:
   Done in latest revision. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248163268


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster 
cluster, String clusterName,
 }
 }
 
+/**
+ * Validates that the underlying connector are running for the given 
MirrorMaker.
+ */
+private boolean isConnectorRunningForMirrorMaker(final Class 
connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+final String connName = connectorClazz.getSimpleName();
+final ConnectorStateInfo connectorStatus = 
mm.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector state is set to running
+&& 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
+}
+
+/**
+ * Validates that the tasks are associated with the connector and they are 
running for the given MirrorMaker.
+ */
+private  boolean 
isTaskRunningForMirrorMakerConnector(final Class connectorClazz, final 
MirrorMaker mm, final String source, final String target) {
+final SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
+final String connName = connectorClazz.getSimpleName();
+final ConnectorStateInfo connectorStatus = 
mm.connectorStatus(sourceAndTarget, connName);
+return isConnectorRunningForMirrorMaker(connectorClazz, mm, 
sourceAndTarget)
+// verify that at least one task exists
+&& !connectorStatus.tasks().isEmpty()
+// verify that tasks are set to running
+&& connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   Done in latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-30 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1615032737

   I had to rebase with trunk to resolve merge conflicts. Changes in latest 
revision.
   
   1\ Moved  `awaitMirrorMakerStart` into the test cases.
   2\ Only check for one mirror maker node to start instead of all of them.
   3\ Only check for connector tasks in one MM node instead of all of the nodes.
   4\ Use NoRetryException to fail fast in case of a task/connector failure. 
(Thank you for introducing me to this nice test utility!)
   5\ Task/Connector failure is already logged at ERROR level.
   
   This test passes locally. @C0urante ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-06-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739196#comment-17739196
 ] 

Chris Egerton commented on KAFKA-15091:
---

I think that was also discussed with KAFKA-5716. I wouldn't necessarily be 
opposed to deprecating {{{}SourceTask::commit{}}}, but given that we're several 
years further along than when that ticket was last discussed, the likelihood of 
connectors relying on that method have increased. We also currently make use of 
this method in MirrorMaker 2 (see KAFKA-14610).

I think this ticket should focus on updating the docs for this method to be 
correct for all releases of Kafka Connect that invoke it; if we want to take 
more drastic action (which, again, I'm not currently opposed to), it probably 
makes sense to tackle that in a separate ticket.

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248148380


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   Ok -- so this is behavior we want then? I guess I was just having trouble 
seeing when we would update to the lower epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248133399


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   Yeah, you’re right. I actually use the current group epoch here because 
hasMetadataExpired would be true if epoch + 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248102033


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   I thought in the case the write is lost, we have the higher epoch in the 
DeadlineAndEpoch and that would signal us to continue to refresh. However, in 
this case, you are saying we would go back and epoch and decide not to try to 
refresh anymore. Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248100591


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Sorry I found it after posting. I should have looked a little longer.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248097012


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(

Review Comment:
   gotcha.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248096864


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   this could for instance if the write is lost so the new epoch would not be 
known anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095763


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
 // Set the refresh time deadline with a higher group epoch.
-group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch() + 1);

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095382


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
 // Set the refresh time deadline with a higher group epoch.
-group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch() + 1);

Review Comment:
   ah right, i did it wrong here. let me revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248094484


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
 // Set the refresh time deadline with a higher group epoch.
-group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch() + 1);

Review Comment:
   why did we remove +1000? Wouldn't a deadline set to the current time be 
expired anyway (regardless of epoch)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092632


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   when we set the refresh deadline, there is no check on the group epoch. 
Maybe it's fine that we went backwards an epoch, but I'm not sure which 
scenario this would be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-30 Thread via GitHub


lucasbru commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1614928567

   @jolshan This makes sense. It seems to me that I need to go through the 
whole KIP and have an end-to-end solution before addressing these kinds of 
consistency problems. At least for the fencing exceptions, I'm leaning towards 
leaving them unwrapped now (and all other fatal exceptions). But I'm not super 
confident, because it seems Guozhang explicitly decided against that solution. 
Now, to make some progress here I would say I remove the wrapping changes to 
`ProducerFencedException` and `InvalidProducerEpochException` and we merge the 
other, less intrusive changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   This part is handled in the `GroupMetadataManager#onNewMetadataImage` and it 
is tested separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092025


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(

Review Comment:
   Sorry. I was referring how when I asked about the heartbeat in the other 
test, I hadn't seen this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248091492


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   btw, i just fixed the previous test case at L566.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248088555


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   
https://github.com/apache/kafka/pull/13901/files/40fcd86ff81782e84d2d2835ac106b83fdfb32a9#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1025



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087985


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(

Review Comment:
   Correct. I don't get what you mean though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087394


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(10));
+
+// The member gets partitions 3, 4 and 5 assigned.
+assertResponseEquals(
+new ConsumerGroupHeartbeatResponseData()
+.setMemberId(memberId)
+.setMemberEpoch(11)
+.setHeartbeatIntervalMs(5000)
+.setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(fooTopicId)
+.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+))),
+result.response()
+);
+
+ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(11)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+.build();
+
+List expectedRecords = Arrays.asList(
+RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+}
+}),
+RecordHelpers.newGroupEpochRecord(groupId, 11),
+RecordHelpers.newTargetAssignmentRecord(groupId, 

[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086803


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   i don't understand your comment. could you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   This part is handled in the `GroupMetadataManager#onNewMetadataImage`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086692


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(10));
+
+// The member gets partitions 3, 4 and 5 assigned.
+assertResponseEquals(
+new ConsumerGroupHeartbeatResponseData()
+.setMemberId(memberId)
+.setMemberEpoch(11)
+.setHeartbeatIntervalMs(5000)
+.setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(fooTopicId)
+.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+))),
+result.response()
+);
+
+ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(11)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+.build();
+
+List expectedRecords = Arrays.asList(
+RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+}
+}),
+RecordHelpers.newGroupEpochRecord(groupId, 11),
+

[GitHub] [kafka] C0urante merged pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance

2023-06-30 Thread via GitHub


C0urante merged PR #13939:
URL: https://github.com/apache/kafka/pull/13939


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248071005


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing one member.
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.build())
+.withAssignment(memberId, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2)))
+.withAssignmentEpoch(10)
+.withSubscriptionMetadata(new HashMap() 
{
+{
+// foo only has 3 partitions stored in the metadata 
but foo has
+// 6 partitions the metadata image.
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+}
+}))
+.build();
+
+// The metadata refresh flag should be true.
+ConsumerGroup consumerGroup = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+// Prepare the assignment result.
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Heartbeat.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(

Review Comment:
   I see we test the heartbeat logic here now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1247912475


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents a topic-partition directory in the local tiered storage under 
which filesets for
+ * log segments are stored.
+ *
+ *
+ * 
+ * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment
+ * .   .  
tvHCaSDsQZWsjr5rbtCjxA-offset_index
+ * .   .  tvHCaSDsQZWsjr5rbtCjxA-time_index
+ * .
+ * / 5fEBmixCR5-dMntYSLIr1g-3-topic / 
BFyXlC8ySMm-Uzxw5lZSMg-segment
+ *  . 
BFyXlC8ySMm-Uzxw5lZSMg-offset_index
+ *  . 
BFyXlC8ySMm-Uzxw5lZSMg-time_index
+ * 
+ */
+public final class RemoteTopicPartitionDirectory {
+private static final Logger LOGGER = 
getLogger(RemoteTopicPartitionDirectory.class);
+private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";

Review Comment:
   `_` (underscore) is not a valid UUID char as per 
https://www.ietf.org/rfc/rfc4122.txt



##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;

[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-06-30 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614890801

   Thanks @fvaleri . Hmm I see 101 test failures. 92 existing and 9 new. 
Atleast the new ones look unrelated..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248060783


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
 assertTrue(group.hasMetadataExpired(time.milliseconds()));
 assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
 assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+// Set the refresh deadline.
+group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());

Review Comment:
   we can just go back an epoch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance

2023-06-30 Thread via GitHub


vamossagar12 commented on code in PR #13939:
URL: https://github.com/apache/kafka/pull/13939#discussion_r1248058129


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() {
 if (assignment.failed()) {
 needsRejoin = true;
 if (isLeader()) {
-log.warn("Join group completed, but assignment failed and we 
are the leader. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are the leader. Reading to end of config and retrying.");
 needsReadToEnd = true;
 } else if (configState.offset() < assignment.offset()) {
-log.warn("Join group completed, but assignment failed and we 
lagging. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are lagging. Reading to end of config and retrying.");

Review Comment:
   Removed the unwanted changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248058020


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Yeah. I think I understand that the metadata image is updated, but I wasn't 
sure if we had anything ensuring that the new metadata image will also trigger 
the refresh of the subscription metadata. (Apologies if this was just in a 
previous pr)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-06-30 Thread via GitHub


fvaleri commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614872160

   @vamossagar12 LGTM. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility

2023-06-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739146#comment-17739146
 ] 

Chris Egerton commented on KAFKA-15102:
---

[~omnia_h_ibrahim] Good call  we should definitely update the compatibility 
section in the KIP to mention this. We may also want list the affected versions 
and link to this issue for further context.

> Mirror Maker 2 - KIP690 backward compatibility
> --
>
> Key: KAFKA-15102
> URL: https://issues.apache.org/jira/browse/KAFKA-15102
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0
>Reporter: David Dufour
>Priority: Major
>
> According to KIP690, "When users upgrade an existing MM2 cluster they don’t 
> need to change any of their current configuration as this proposal maintains 
> the default behaviour for MM2."
> Now, the separator is subject to customization.
> As a consequence, when an MM2 upgrade is performed, if the separator was 
> customized with replication.policy.separator, the name of this internal topic 
> changes. It then generates issues like:
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.InvalidTopicException: Topic 
> 'mm2-offset-syncs_bkts28_internal' collides with existing topics: 
> mm2-offset-syncs.bkts28.internal
> It has been observed that the replication can then be broken sometimes 
> several days after the upgrade (reason not identified). By deleting the old 
> topic name, it recovers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-06-30 Thread via GitHub


C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1248004857


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }
 
 private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+Set addBindings = new HashSet<>();
+addBindings.addAll(bindings);

Review Comment:
   Nit: can simplify
   ```suggestion
   Set addBindings = new HashSet<>(bindings);
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }
 
 private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+Set addBindings = new HashSet<>();
+addBindings.addAll(bindings);
+addBindings.removeAll(knownTopicAclBindings);
+if (!addBindings.isEmpty()) {
+log.info("Syncing new found {} topic ACL bindings.", 
addBindings.size());
+targetAdminClient.createAcls(addBindings).values().forEach((k, v) 
-> v.whenComplete((x, e) -> {
+if (e != null) {
+log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
+}
+}));
+knownTopicAclBindings = bindings;

Review Comment:
   ```suggestion
   knownTopicAclBindings =  new HashSet<>(bindings);
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }
 
 private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+Set addBindings = new HashSet<>();
+addBindings.addAll(bindings);
+addBindings.removeAll(knownTopicAclBindings);

Review Comment:
   My IDE nagged me about possible slow performance for invoking 
`Set::removeAll` with a `List` as an argument. Some research led to [this 
fascinating blog 
post](https://codeblog.jonskeet.uk/2010/07/29/there-s-a-hole-in-my-abstraction-dear-liza-dear-liza/).
   
   I think the scenario described there isn't likely to impact us frequently, 
but just in case, do you think we can change `knownTopicAclBindings` from a 
`List` to a `Set`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15137) Don't log the entire request in KRaftControllerChannelManager

2023-06-30 Thread David Arthur (Jira)
David Arthur created KAFKA-15137:


 Summary: Don't log the entire request in 
KRaftControllerChannelManager
 Key: KAFKA-15137
 URL: https://issues.apache.org/jira/browse/KAFKA-15137
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.5.0, 3.6.0
Reporter: David Arthur
Assignee: Alyssa Huang
 Fix For: 3.5.1


While debugging some junit tests, I noticed some really long log lines in 
KRaftControllerChannelManager. When the broker is down, we log a WARN that 
includes the entire UpdateMetadataRequest or LeaderAndIsrRequest. For large 
clusters, these can be really large requests, so this could potentially cause 
excessive output in the log4j logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working

2023-06-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13988:
--
Component/s: (was: KafkaConnect)

> Mirrormaker 2 auto.offset.reset=latest not working
> --
>
> Key: KAFKA-13988
> URL: https://issues.apache.org/jira/browse/KAFKA-13988
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.2.0
> Environment: Source Kafka cluster running on Ubuntu 20
> Source Kafka cluster Kafka v0.10
> Target Kafka cluster running in AWS MSK
> Target Kafka cluster Kafka v2.6.2
> Mirrormaker version 3.2.0 running on Ubuntu 20.
>Reporter: Daniel Florek
>Assignee: Ravindranath Kakarla
>Priority: Major
>
> Hi. 
> I have problem setting up mirroring with MM2 from latest offset between 2 
> clusters. In logs I can see that Consumer that is consuming topics has 
> auto.offset.reset property set to latest. But still topics are read from 
> offset 0. I am using following configuration:
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = broker-01A:9092
> B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092
> replication.policy.class = 
> org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> #Enable replication between clusters and define topics which should be 
> replicated
> A->B.enabled = true
> A->B.topics = .*
> A->B.replication.factor=3
> A->B.emit.heartbeats.enabled = true
> A->B.emit.checkpoints.enabled = true
> auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> refresh.topics.enabled=true
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> config.storage.replication.factor = 1
> offset.storage.replication.factor = 1
> status.storage.replication.factor = 1 {code}
> I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker 
> running on EC2 instance in AWS (quite an old version I think 0.10). Target 
> kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). 
> Could you point me what I am doing wrong? Or is this possibly a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working

2023-06-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13988:
--
Fix Version/s: (was: 3.2.0)

> Mirrormaker 2 auto.offset.reset=latest not working
> --
>
> Key: KAFKA-13988
> URL: https://issues.apache.org/jira/browse/KAFKA-13988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.2.0
> Environment: Source Kafka cluster running on Ubuntu 20
> Source Kafka cluster Kafka v0.10
> Target Kafka cluster running in AWS MSK
> Target Kafka cluster Kafka v2.6.2
> Mirrormaker version 3.2.0 running on Ubuntu 20.
>Reporter: Daniel Florek
>Assignee: Ravindranath Kakarla
>Priority: Major
>
> Hi. 
> I have problem setting up mirroring with MM2 from latest offset between 2 
> clusters. In logs I can see that Consumer that is consuming topics has 
> auto.offset.reset property set to latest. But still topics are read from 
> offset 0. I am using following configuration:
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = broker-01A:9092
> B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092
> replication.policy.class = 
> org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> #Enable replication between clusters and define topics which should be 
> replicated
> A->B.enabled = true
> A->B.topics = .*
> A->B.replication.factor=3
> A->B.emit.heartbeats.enabled = true
> A->B.emit.checkpoints.enabled = true
> auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> refresh.topics.enabled=true
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> config.storage.replication.factor = 1
> offset.storage.replication.factor = 1
> status.storage.replication.factor = 1 {code}
> I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker 
> running on EC2 instance in AWS (quite an old version I think 0.10). Target 
> kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). 
> Could you point me what I am doing wrong? Or is this possibly a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

2023-06-30 Thread via GitHub


C0urante commented on code in PR #13905:
URL: https://github.com/apache/kafka/pull/13905#discussion_r1247963748


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -53,6 +53,7 @@ public class MirrorSourceTask extends SourceTask {
 private static final Logger log = 
LoggerFactory.getLogger(MirrorSourceTask.class);
 
 private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+public static final long NON_EXISTING_OFFSET_VALUE = -1L;

Review Comment:
   Perhaps instead of a single sentinel value to denote uncommitted offsets, we 
can reject all values less than zero and replace this constant with a method? 
Could also help with readability with some of the Java 8 streams logic.
   
   ```java
   private boolean isUncommitted(Long offset) {
   return offset == null || offset < 0;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set

2023-06-30 Thread via GitHub


C0urante commented on code in PR #13905:
URL: https://github.com/apache/kafka/pull/13905#discussion_r1247968250


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -31,25 +31,36 @@
 import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+import org.mockito.internal.util.collections.Sets;

Review Comment:
   Nit: probably better to avoid depending on internal packages if we can avoid 
it. Left some suggestions on how to do this without much additional work below



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -106,13 +107,28 @@ public void start(Map props) {
 Map topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
 consumer.assign(topicPartitionOffsets.keySet());
 log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-.filter(x -> x.getValue() == 0L).count());
-log.trace("Seeking offsets: {}", topicPartitionOffsets);
-topicPartitionOffsets.forEach(consumer::seek);
+.filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());

Review Comment:
   Nit: we can clean this up if we use the `isUncommitted` method
   
   ```suggestion
   log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()
   .filter(this::isUncommitted).count());
   ```



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -214,6 +225,82 @@ public void testPoll() {
 }
 }
 
+@Test
+public void testSeekBehaviorDuringStart() {
+// Setting up mock behavior.
+@SuppressWarnings("unchecked")
+KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+
+@SuppressWarnings("unchecked")
+KafkaProducer mockProducer = mock(KafkaProducer.class);
+
+String sourceClusterName = "sourceCluster";
+MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class);
+
+SourceTaskContext mockSourceTaskContext = 
mock(SourceTaskContext.class);
+OffsetStorageReader mockOffsetStorageReader = 
mock(OffsetStorageReader.class);
+
when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+MockedStatic mockMirrorUtils = 
mockStatic(MirrorUtils.class, new CallsRealMethods());
+mockMirrorUtils.when(() -> 
MirrorUtils.newConsumer(anyMap())).thenReturn(mockConsumer);
+mockMirrorUtils.when(() -> 
MirrorUtils.newProducer(anyMap())).thenReturn(mockProducer);
+
+Set topicPartitions = Sets.newSet(

Review Comment:
   ```suggestion
   Set topicPartitions = new HashSet<>(Arrays.asList(
   ```
   
   (will also require a couple new imports and an additional trailing 
parenthesis)



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -106,13 +107,28 @@ public void start(Map props) {
 Map topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
 consumer.assign(topicPartitionOffsets.keySet());
 log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-.filter(x -> x.getValue() == 0L).count());
-log.trace("Seeking offsets: {}", topicPartitionOffsets);
-topicPartitionOffsets.forEach(consumer::seek);
+.filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());
+
+log.trace("Seeking offsets: {}", 
topicPartitionOffsets.entrySet().stream()
+.filter(topicPartitionOffset ->
+topicPartitionOffset.getValue() != 
NON_EXISTING_OFFSET_VALUE));
+
+topicPartitionOffsets.forEach(this::maybeSeek);
 log.info("{} replicating {} topic-partitions {}->{}: {}.", 
Thread.currentThread().getName(),
 taskTopicPartitions.size(), sourceClusterAlias, 
config.targetClusterAlias(), taskTopicPartitions);
 }
 
+private void maybeSeek(TopicPartition topicPartition, Long offset) {
+// Do not call seek on partitions that don't have an existing offset 
committed.
+if (offset == NON_EXISTING_OFFSET_VALUE) {

Review Comment:
   If we use `isUncommitted`:
   ```suggestion
   if (isUncommitted(offset)) {
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -106,13 +107,28 @@ public void start(Map props) {
 Map topicPartitionOffsets = 

[GitHub] [kafka] hudeqi commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


hudeqi commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614799513

   And this please @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown

2023-06-30 Thread via GitHub


hudeqi commented on PR #13929:
URL: https://github.com/apache/kafka/pull/13929#issuecomment-1614790587

   And please help to review this @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


hudeqi commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614782255

   Combined with the explanation of 
https://github.com/apache/kafka/pull/3506#discussion_r128589927 and the results 
of my actual test, I think this is the case: All unit tests in 
`GroupMetadataManagerTest` involve many times of "new GroupMetadataManager" 
behavior , since "calls newGauge" is a global registry, it will affect the 
subsequent metric verification (`testMetrics()` method) in multiple unit tests.
   I remember that I replaced "recreateGauge" directly before mentioning this 
PR. The unit test failed. Now because I added "removeMetrics()" to 
"groupMetadataManager.shutdown()"(also in@AfterEach), it will be the same as 
"recreateGauge" effect. @clolov @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


dajac commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614765471

   Yeah, that seems to be a general issue with Yammer based metrics. It is not 
about individual unit tests. It is about integration tests that create multiple 
KafkaServers. In this case, the metric registry is shared by all instances:
   
   ```
   public final  Gauge newGauge(String name, Gauge metric, 
Map tags) {
   return 
KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric);
   }
   ```
   
   That being said, it seem that `newGauge` here uses `getOrAdd` internally in 
the registry so it may be fine in the recent version of Yammer. It may be worth 
checking if this is always true (may depend on the version).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


clolov commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614755545

   Hudeqi mentioned this comment as well, but I still do not understand how 
this could happen - is it that we keep the same metric registry between 
individual unit tests? If this is the case, don't we have this problem for 
other groups of metrics as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


hudeqi commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247947969


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   I have updated this. @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


hudeqi commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247947464


##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  // Avoid adding legacy tags for a metric when initializing 
`LogCleanerManager`
+  GaugeMetricNameWithTag.clear()
   /* gauges for tracking the number of partitions marked as uncleanable for 
each log directory */
   for (dir <- logDirs) {
-metricsGroup.newGauge("uncleanable-partitions-count",
+metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
   () => inLock(lock) { 
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
   Map("logDirectory" -> dir.getAbsolutePath).asJava
 )
+
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k 
=> new java.util.ArrayList[java.util.Map[String, String]]())
+  .add(Map("logDirectory" -> dir.getAbsolutePath).asJava)

Review Comment:
   committed the update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247921648


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   ah ok. Can we please unit test this will multi dirs since the metric 
addition/removal here is dependent on number of directories and we want to 
ensure it works correctly for multiple dir case as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance

2023-06-30 Thread via GitHub


C0urante commented on code in PR #13939:
URL: https://github.com/apache/kafka/pull/13939#discussion_r1247916426


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() {
 if (assignment.failed()) {
 needsRejoin = true;
 if (isLeader()) {
-log.warn("Join group completed, but assignment failed and we 
are the leader. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are the leader. Reading to end of config and retrying.");
 needsReadToEnd = true;
 } else if (configState.offset() < assignment.offset()) {
-log.warn("Join group completed, but assignment failed and we 
lagging. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are lagging. Reading to end of config and retrying.");

Review Comment:
   I think the only change we need here is "we are lagging"; "assignment" is 
fine without a preceeding "the".



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() {
 if (assignment.failed()) {
 needsRejoin = true;
 if (isLeader()) {
-log.warn("Join group completed, but assignment failed and we 
are the leader. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are the leader. Reading to end of config and retrying.");
 needsReadToEnd = true;
 } else if (configState.offset() < assignment.offset()) {
-log.warn("Join group completed, but assignment failed and we 
lagging. Reading to end of config and retrying.");
+log.warn("Join group completed, but the assignment failed and 
we are lagging. Reading to end of config and retrying.");

Review Comment:
   I think the only change we need here is "we are lagging"; "assignment" is 
fine without a preceding "the".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


hudeqi commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247914999


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   > GaugeMetricNameWithTag can contain multiple metrics with same name but 
different tags. Let's say `UncleanablePartitionsCountMetricName ` is the name 
and tag will be directory path. Over here, we are verifying that there is an 
exactly 1 call to newGauge() with `UncleanablePartitionsCountMetricName` as the 
metricname. But that is not correct because we are actually calling `newGauge` 
for this key name numDirs times.
   > 
   > Hence, this test should ideally fail. What am I missing here?
   
   I think the reason is: "logDirs = Array(TestUtils.tempDir())", means that 
the `numDirs` size is 1. @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247907443


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   GaugeMetricNameWithTag can contain multiple metrics with same name but 
different tags. Let's say `UncleanablePartitionsCountMetricName ` is the name 
and tag will be directory path. Over here, we are verifying that there is an 
exactly 1 call to newGauge() with `UncleanablePartitionsCountMetricName` as the 
metricname. But that is not correct because we are actually calling `newGauge` 
for this key name numDirs times. 
   
   Hence, this test should ideally fail. What am I missing here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


dajac commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614681662

   Found this: https://github.com/apache/kafka/pull/3506#discussion_r128589927.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739095#comment-17739095
 ] 

Chris Egerton commented on KAFKA-15127:
---

In that case, perhaps we can leave it unassigned and note in the description 
that we'd like to let the initial offset management API soak for a bit to 
gather user feedback before pursuing this?

> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


clolov commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614673915

   But if this is the case won't we run into the same problem for at least one 
other subset of metrics? Also if this is the case, that would mean that if we 
remove the method the tests running as part of the auto-build will start 
failing, no?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


clolov commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247880517


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   Sorry, why do you think we will call this multiple times with the same 
metric name? The way I read this code is that we will get the keys from 
GaugeMetricNameWithTag and GaugeMetricNameNoTag (which are unique because of 
the nature of a set and map) and for each one we will verify that it is called 
only once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


dajac commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614670671

   I was wondering if it is because we run multiple brokers in the same JVM in 
tests but I am not sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-30 Thread via GitHub


jeqo commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1247799637


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -152,19 +156,23 @@ public class RemoteLogManager implements Closeable {
  * @param time  Time instance.
  * @param clusterId The cluster id.
  * @param fetchLog  function to get UnifiedLog instance for a given topic.
+ * @param updateRemoteLogStartOffset function to update the 
log-start-offset for a given topic partition.
  */
 public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
 int brokerId,
 String logDir,
 String clusterId,
 Time time,
-Function> 
fetchLog) {
+Function> 
fetchLog,
+BiConsumer 
updateRemoteLogStartOffset) {

Review Comment:
   nit: 
   ```suggestion
   BiConsumer 
updateLogStartOffsetFromRemoteTier) {
   ```



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -634,6 +642,241 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws 

[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


clolov commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614665719

   Okay, that makes sense @dajac, do you happen to know (or are able to deduce) 
the answer to the other question about why we needed the recreateGauge method 
in the first place and is it safe to get rid of it now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


divijvaidya commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247863305


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -83,11 +84,19 @@ class LogCleanerTest {
   val numMetricsRegistered = LogCleaner.MetricNames.size
   verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
   
-  // verify that each metric is removed
+  // verify that each metric in `LogCleaner` is removed
   LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+  // verify that each metric in `LogCleanerManager` is removed
+  val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+  LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+  
LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any(), any()))

Review Comment:
   does this work? In our case we will call this multiple times with same 
metric Name. The any() will match all such occurrences whereas we are saying 
that this invocation occurs only 1 time (default is times(1), when not 
specified)
   
   Please help me understand why it's working.



##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  // Avoid adding legacy tags for a metric when initializing 
`LogCleanerManager`
+  GaugeMetricNameWithTag.clear()
   /* gauges for tracking the number of partitions marked as uncleanable for 
each log directory */
   for (dir <- logDirs) {
-metricsGroup.newGauge("uncleanable-partitions-count",
+metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
   () => inLock(lock) { 
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
   Map("logDirectory" -> dir.getAbsolutePath).asJava
 )
+
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k 
=> new java.util.ArrayList[java.util.Map[String, String]]())
+  .add(Map("logDirectory" -> dir.getAbsolutePath).asJava)

Review Comment:
   could we move the `Map("logDirectory" -> dir.getAbsolutePath` part to a val 
metricTag and then re-use it at both locations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-30 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1247814090


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleWriteOperation("generic-group-join",

Review Comment:
   I wonder if we need to handle the future returned by 
`scheduleWriteOperation` as well. At minimum, we may want to react to errors. 
This could for instance happen if something goes wrong before the join group 
handling is event triggered.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -578,4 +619,19 @@ public void shutdown() {
 Utils.closeQuietly(runtime, "coordinator runtime");
 log.info("Shutdown complete.");
 }
+
+private boolean isValidGroupId(String groupId, ApiKeys api) {
+if (api == ApiKeys.OFFSET_COMMIT ||
+api == ApiKeys.OFFSET_FETCH ||
+api == ApiKeys.DESCRIBE_GROUPS ||
+api == ApiKeys.DELETE_GROUPS

Review Comment:
   nit: I am not a fan of this validation. I wonder if we should just have two 
helpers: `isGroupIdNotNull` and `isGroupIdNotEmpty`. In this PR, we would only 
need `isGroupIdNotEmpty`. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1087,1348 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();

Review Comment:
   I wonder if we could avoid passing the version to this method by adding `-1` 
as the default value of `rebalanceTimeout` in `GroupMetadataValue`. It seems 
that we could rely on this to decide here.
   
   Another way that I was thinking about would be to pass the `Record` to the 
replay method as it contains all the available information. Have you considered 
this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1087,1348 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.

Review Comment:
   I think that the group should be deleted in this case.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleGenericGroupOperation("generic-group-join",

[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


dajac commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614635663

   @clolov Constants (in the companion object) in Scala start with a capital 
letter in our code base.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-30 Thread via GitHub


clolov commented on PR #13926:
URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614621247

   In theory this change makes sense to me. As with the [1/N] one I would 
prefer if variable names start with a lowercase unless there is a good reason 
for them not to. I have reached out to @guozhangwang who was also wondering 
what the purpose of the recreateGauge method is. I thought that there is only 
ever one GroupMetadataManager so I do not understand the purpose of this method 
in the first place and I would like to get a confirmation before we remove it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kriscfoster commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-30 Thread via GitHub


kriscfoster commented on PR #13865:
URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614606533

   Great, thank you @jlprat!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-30 Thread via GitHub


jlprat commented on PR #13865:
URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614604961

   Hi @kriscfoster
   Currently `3.5.1` and `3.6.0` are the only releases in the making (`3.6.0` 
is expected to be released during September) that I'm aware of.
   
   I think the version that would be released the soonest with this patch is 
`3.5.1`. If you would need to have this fixed for `3.4.x` you can do a jar 
replacement of `snappy-java` with the `1.1.10.1` version.
   If there would be high demand from the community to have a new `3.4.x` 
version, a maintainer might pick up the task of creating a new release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-30 Thread via GitHub


divijvaidya merged PR #13923:
URL: https://github.com/apache/kafka/pull/13923


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kriscfoster commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-30 Thread via GitHub


kriscfoster commented on PR #13865:
URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614589914

   @jlprat do you know when the next release will be on 3.4 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-06-30 Thread via GitHub


showuon commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1247809160


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.Optional.ofNullable;
+
+/**
+ * Represents an interaction between a broker and a second-tier storage. This 
type of event is generated
+ * by the {@link LocalTieredStorage} which is an implementation of the {@link 
RemoteStorageManager}
+ * operating in Kafka's runtime as the interface between Kafka and external 
storage systems, through
+ * which all such interactions go through.
+ */
+public final class LocalTieredStorageEvent implements 
Comparable {
+
+/**
+ * The nature of the interaction.
+ */
+public enum EventType {
+OFFLOAD_SEGMENT,
+FETCH_SEGMENT,
+FETCH_OFFSET_INDEX,
+FETCH_TIME_INDEX,
+FETCH_TRANSACTION_INDEX,
+FETCH_LEADER_EPOCH_CHECKPOINT,
+FETCH_PRODUCER_SNAPSHOT,
+DELETE_SEGMENT,
+DELETE_PARTITION
+}
+
+private final int brokerId;
+private final EventType type;
+private final RemoteLogSegmentId segmentId;
+private final int timestamp;
+private final Optional fileset;
+private final Optional metadata;
+private final int startPosition;
+private final int endPosition;
+private final Optional exception;
+
+/**
+ * Assess whether this event matches the characteristics of an event 
specified by the {@code condition}.
+ *
+ * @param condition The condition which contains the characteristics to 
match.
+ * @return true if this event matches the condition's characteristics, 
false otherwise.
+ */
+public boolean matches(final LocalTieredStorageCondition condition) {
+if (brokerId != condition.brokerId) {
+return false;
+}
+if (condition.eventType != type) {
+return false;
+}
+if 
(!segmentId.topicIdPartition().topicPartition().equals(condition.topicPartition))
 {
+return false;
+}
+if (!exception.map(e -> condition.failed).orElseGet(() -> 
!condition.failed)) {
+return false;
+}
+return true;
+}
+
+/**
+ * Returns whether the provided {@code event} was created after the 
present event.
+ * This assumes a chronological ordering of events.
+ * Both events need to be generated from the same broker.
+ *
+ * @param event The event to compare
+ * @return true if the current instance was generated after the given 
{@code event},
+ * false if events are equal or the current instance was generated 
before the
+ * given {@code event}.
+ */
+public boolean isAfter(final LocalTieredStorageEvent event) {
+return event.timestamp < timestamp;
+}
+
+public EventType getType() {
+return type;
+}
+
+public TopicPartition getTopicPartition() {
+return segmentId.topicIdPartition().topicPartition();
+}
+
+@Override
+public int compareTo(LocalTieredStorageEvent other) {
+requireNonNull(other);
+
+if (other.timestamp > timestamp) {
+return -1;
+}
+if (other.timestamp < timestamp) {
+return 1;
+}
+return 0;

Review Comment:
   We can directly return 
   ```
   return timestamp - other.timestamp;
   ```
   > Compares this object with the specified object for order. Returns a 
negative integer, zero, or a positive integer as this object is less than, 
equal to, or greater than the specified object.
   
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/lang/Comparable.html#compareTo-T-
   



##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation 

[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247805182


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -172,19 +186,21 @@ public List build(TopicsImage topicsImage) {
 });
 
 // Add subscription metadata.
-Map subscriptionMetadata = new HashMap<>();
-members.forEach((memberId, member) -> {
-member.subscribedTopicNames().forEach(topicName -> {
-TopicImage topicImage = topicsImage.getTopic(topicName);
-if (topicImage != null) {
-subscriptionMetadata.put(topicName, new TopicMetadata(
-topicImage.id(),
-topicImage.name(),
-topicImage.partitions().size()
-));
-}
+if (subscriptionMetadata == null) {

Review Comment:
   Most of tests are just fine with the auto-generated subscription metadata. 
However, the new ones need specific subscription metadata to verify the check. 
This is why I extended this builder to either accept the subscription metadata 
to use or to auto-generate it.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   We cannot test this here because the runtime is not aware of the concrete 
implementation of the state machine. I also want to ensure that we are on the 
same page. The metadata image is updated when `onNewMetadataImage` is called 
but the subscription metadata is refreshed on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-30 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247803617


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
 assertFutureThrows(write1, NotCoordinatorException.class);
 assertFutureThrows(write2, NotCoordinatorException.class);
 }
+
+@Test
+public void testOnNewMetadataImage() {
+TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorBuilderSupplier supplier = 
mock(MockCoordinatorBuilderSupplier.class);
+MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withLoader(loader)
+.withEventProcessor(new MockEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorBuilderSupplier(supplier)
+.build();
+
+MockCoordinator coordinator0 = mock(MockCoordinator.class);
+MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+when(supplier.get()).thenReturn(builder);
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.build())
+.thenReturn(coordinator0)
+.thenReturn(coordinator1);
+
+CompletableFuture future0 = new CompletableFuture<>();
+when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+CompletableFuture future1 = new CompletableFuture<>();
+when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+runtime.scheduleLoadOperation(tp0, 0);
+runtime.scheduleLoadOperation(tp1, 0);
+
+// Coordinator 0 is loaded. It should get the current image
+// that is the empty one.
+future0.complete(null);
+verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+// Publish a new image.
+MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+runtime.onNewMetadataImage(newImage, delta);
+
+// Coordinator 0 should be notified about it.
+verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   We cannot test this here because the runtime is not aware of the concrete 
implementation of the state machine. I also want to ensure that we are on the 
same page. The metadata image is updated when `onNewMetadataImage` is called 
but the subscription metadata is refreshed on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


hudeqi commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247795935


##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
 }
 
 private[log] object LogCleanerManager extends Logging {
+  private val UncleanablePartitionsCountMetricName = 
"uncleanable-partitions-count"

Review Comment:
   > Is there a reason why decided to have these variable names start with a 
capital letter?
   
   Learned scala writing rules from discussions like PR: 
https://github.com/apache/kafka/pull/13623#discussion_r1182593025  @clolov 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


hudeqi commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247795935


##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
 }
 
 private[log] object LogCleanerManager extends Logging {
+  private val UncleanablePartitionsCountMetricName = 
"uncleanable-partitions-count"

Review Comment:
   > Is there a reason why decided to have these variable names start with a 
capital letter?
   
   https://github.com/apache/kafka/pull/13623#discussion_r1182593025



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-30 Thread via GitHub


clolov commented on code in PR #13924:
URL: https://github.com/apache/kafka/pull/13924#discussion_r1247788765


##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
 }
 
 private[log] object LogCleanerManager extends Logging {
+  private val UncleanablePartitionsCountMetricName = 
"uncleanable-partitions-count"

Review Comment:
   Is there a reason why decided to have these variable names start with a 
capital letter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM

2023-06-30 Thread via GitHub


jeqo commented on code in PR #13938:
URL: https://github.com/apache/kafka/pull/13938#discussion_r1247786162


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -220,8 +220,8 @@ void testRemoteLogMetadataManagerWithEndpointConfig() {
 
 ArgumentCaptor> capture = 
ArgumentCaptor.forClass(Map.class);
 verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
-assertEquals(host + ":" + port, 
capture.getValue().get("bootstrap.servers"));
-assertEquals(securityProtocol, 
capture.getValue().get("security.protocol"));
+assertEquals(host + ":" + port, 
capture.getValue().get("remote.log.metadata.common.client.bootstrap.servers"));
+assertEquals(securityProtocol, 
capture.getValue().get("remote.log.metadata.common.client.security.protocol"));

Review Comment:
   Sure, I was following the text content instead, but we can use the constant 
here as well. Applying fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-06-30 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614540440

   @ruslankrivoshein , I have fixed the checkstyle issues. Also, I believe that 
the other comment 
[here](https://github.com/apache/kafka/pull/13158#issuecomment-1422555044) has 
been addressed by you in the PR https://github.com/apache/kafka/pull/13562. 
   Sorry for the long wait here- it just went off my radar. @fvaleri , do you 
think this is looking fine now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance

2023-06-30 Thread via GitHub


vamossagar12 opened a new pull request, #13939:
URL: https://github.com/apache/kafka/pull/13939

   Minor improvements to comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 closed pull request #12874: Reproducing callbacks added even when workthread is terminated

2023-06-30 Thread via GitHub


vamossagar12 closed pull request #12874: Reproducing callbacks added even when 
workthread is terminated
URL: https://github.com/apache/kafka/pull/12874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12874: Reproducing callbacks added even when workthread is terminated

2023-06-30 Thread via GitHub


vamossagar12 commented on PR #12874:
URL: https://github.com/apache/kafka/pull/12874#issuecomment-1614498011

   Closing as the main purpose of this was to do a POC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM

2023-06-30 Thread via GitHub


showuon commented on code in PR #13938:
URL: https://github.com/apache/kafka/pull/13938#discussion_r1247736125


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -220,8 +220,8 @@ void testRemoteLogMetadataManagerWithEndpointConfig() {
 
 ArgumentCaptor> capture = 
ArgumentCaptor.forClass(Map.class);
 verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
-assertEquals(host + ":" + port, 
capture.getValue().get("bootstrap.servers"));
-assertEquals(securityProtocol, 
capture.getValue().get("security.protocol"));
+assertEquals(host + ":" + port, 
capture.getValue().get("remote.log.metadata.common.client.bootstrap.servers"));
+assertEquals(securityProtocol, 
capture.getValue().get("remote.log.metadata.common.client.security.protocol"));

Review Comment:
   Any reason why don't we use `REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX` like 
other places did?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-30 Thread via GitHub


satishd commented on code in PR #13923:
URL: https://github.com/apache/kafka/pull/13923#discussion_r1247690846


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java:
##
@@ -75,6 +75,9 @@ enum IndexType {
  * 
  * Invoker of this API should always send a unique id as part of {@link 
RemoteLogSegmentMetadata#remoteLogSegmentId()}
  * even when it retries to invoke this method for the same log segment 
data.
+ * 
+ * This operation is expected to be idempotent. If a copy operation is 
retried and there is existing content already written,

Review Comment:
   Newly added statement gives more clarity on idempotency. 
   
   The unique id sent as part of `RemoteLogSegmentMetadata` can be used to 
avoid overwrites in RSM implementation. Caller makes sure it sends unique id 
for multiple invocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable

2023-06-30 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-15136:
--
Priority: Major  (was: Minor)

> The inspection of field allowAutoTopicCreation field in MetadataRequest is 
> unreasonable
> ---
>
> Key: KAFKA-15136
> URL: https://issues.apache.org/jira/browse/KAFKA-15136
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Major
>
>  
> {code:java}
> if (!data.allowAutoTopicCreation() && version < 4)
> throw new UnsupportedVersionException("MetadataRequest versions older 
> than 4 don't support the " +
> "allowAutoTopicCreation field"); {code}
>  
> Background:
> Based on my understanding, the code in MetadataRequest is intended to ignore 
> the allowAutoTopicCreation field when version≤4. However, if kafka server is 
> configured with "auto.create.topics.enable"=false and the client sets 
> allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, 
> which is not expected.
>  
> Issues:
>  # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when 
> sending MetadataRequest to a lower version server, making it unusable.
>  # MetadataRequestTest avoids this issue by setting 
> `allowAutoTopicCreation=true` in tests for version≤4, but this is not 
> reasonable. And the comments in 
> [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146]
>  may also be problematic.
>  
> Solution:  
>  # Remove the checking code in MetadataRequest.
>  # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw 
> an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, 
> without considering the value of allowAutoTopicCreation field.
>  
> If there is indeed an issue, I can work on fixing it. Looking forward to your 
> reply.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable

2023-06-30 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-15136:
--
Description: 
 
{code:java}
if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 
4 don't support the " +
"allowAutoTopicCreation field"); {code}
 

Background:


Based on my understanding, the code in MetadataRequest is intended to ignore 
the allowAutoTopicCreation field when version≤4. However, if kafka server is 
configured with "auto.create.topics.enable"=false and the client sets 
allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which 
is not expected.

 

Issues:
 # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when 
sending MetadataRequest to a lower version server, making it unusable.
 # MetadataRequestTest avoids this issue by setting 
`allowAutoTopicCreation=true` in tests for version≤4, but this is not 
reasonable. And the comments in 
[testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146]
 may also be problematic.

 

Solution:  
 # Remove the checking code in MetadataRequest.
 # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an 
exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without 
considering the value of allowAutoTopicCreation field.

 

If there is indeed an issue, I can work on fixing it. Looking forward to your 
reply.

  was:
```java

if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 
don't support the " +
"allowAutoTopicCreation field");

```

Background:
Based on my understanding, the code in MetadataRequest is intended to ignore 
the allowAutoTopicCreation field when version≤4. However, if kafka server is 
configured with "auto.create.topics.enable"=false and the client sets 
allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which 
is not expected.

Issues:
 # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when 
sending MetadataRequest to a lower version server, making it unusable.
 # MetadataRequestTest avoids this issue by setting 
`allowAutoTopicCreation=true` in tests for version≤4, but this is not 
reasonable. And the comments in 
[testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146]
 may also be problematic.

Solution:  
 # Remove the checking code in MetadataRequest.
 # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an 
exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without 
considering the value of allowAutoTopicCreation field.

 

If there is indeed an issue, I can work on fixing it. Looking forward to your 
reply.


> The inspection of field allowAutoTopicCreation field in MetadataRequest is 
> unreasonable
> ---
>
> Key: KAFKA-15136
> URL: https://issues.apache.org/jira/browse/KAFKA-15136
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
>  
> {code:java}
> if (!data.allowAutoTopicCreation() && version < 4)
> throw new UnsupportedVersionException("MetadataRequest versions older 
> than 4 don't support the " +
> "allowAutoTopicCreation field"); {code}
>  
> Background:
> Based on my understanding, the code in MetadataRequest is intended to ignore 
> the allowAutoTopicCreation field when version≤4. However, if kafka server is 
> configured with "auto.create.topics.enable"=false and the client sets 
> allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, 
> which is not expected.
>  
> Issues:
>  # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when 
> sending MetadataRequest to a lower version server, making it unusable.
>  # MetadataRequestTest avoids this issue by setting 
> `allowAutoTopicCreation=true` in tests for version≤4, but this is not 
> reasonable. And the comments in 
> [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146]
>  may also be problematic.
>  
> Solution:  
>  # Remove the checking code in MetadataRequest.
>  # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw 
> an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, 
> without considering the value of allowAutoTopicCreation field.
>  
> If there is indeed an issue, I can work on fixing it. Looking forward to 

[jira] [Created] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable

2023-06-30 Thread Xiaobing Fang (Jira)
Xiaobing Fang created KAFKA-15136:
-

 Summary: The inspection of field allowAutoTopicCreation field in 
MetadataRequest is unreasonable
 Key: KAFKA-15136
 URL: https://issues.apache.org/jira/browse/KAFKA-15136
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Xiaobing Fang


```java

if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 
don't support the " +
"allowAutoTopicCreation field");

```

Background:
Based on my understanding, the code in MetadataRequest is intended to ignore 
the allowAutoTopicCreation field when version≤4. However, if kafka server is 
configured with "auto.create.topics.enable"=false and the client sets 
allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which 
is not expected.

Issues:
 # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when 
sending MetadataRequest to a lower version server, making it unusable.
 # MetadataRequestTest avoids this issue by setting 
`allowAutoTopicCreation=true` in tests for version≤4, but this is not 
reasonable. And the comments in 
[testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146]
 may also be problematic.

Solution:  
 # Remove the checking code in MetadataRequest.
 # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an 
exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without 
considering the value of allowAutoTopicCreation field.

 

If there is indeed an issue, I can work on fixing it. Looking forward to your 
reply.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-30 Thread via GitHub


jeqo commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1614448203

   Sure!, created this: https://github.com/apache/kafka/pull/13938


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo opened a new pull request, #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM

2023-06-30 Thread via GitHub


jeqo opened a new pull request, #13938:
URL: https://github.com/apache/kafka/pull/13938

   Pass endpoint properties from RLM to TBRLMM and validate those are not 
ignored. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-30 Thread via GitHub


satishd commented on code in PR #13923:
URL: https://github.com/apache/kafka/pull/13923#discussion_r1247695189


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java:
##
@@ -120,7 +123,10 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
  * @param indexTypetype of the index to be fetched for the 
segment.
  * @return input stream of the requested index.
  * @throws RemoteStorageException  if there are any errors while 
fetching the index.
- * @throws RemoteResourceNotFoundException when there are no resources 
associated with the given remoteLogSegmentMetadata.
+ * @throws RemoteResourceNotFoundException the requested index is not 
found in the remote storage
+ * (e.g. Transaction index may not exist because segments create prior to 
version 2.8.0 will not have transaction index associated with them.).

Review Comment:
   minor typo: `segments create prior to version` -> `segments created prior to 
version`



##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java:
##
@@ -75,6 +75,9 @@ enum IndexType {
  * 
  * Invoker of this API should always send a unique id as part of {@link 
RemoteLogSegmentMetadata#remoteLogSegmentId()}
  * even when it retries to invoke this method for the same log segment 
data.
+ * 
+ * This operation is expected to be idempotent. If a copy operation is 
retried and there is existing content already written,

Review Comment:
   Newly added statement gives more clarity on idempotency. 
   
   The given unique id sent as part of `RemoteLogSegmentMetadata` can be used 
to avoid overwrites in RSM implementation. Caller makes sure it sends unique id 
for multiple invocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >