kamalcph commented on code in PR #20811:
URL: https://github.com/apache/kafka/pull/20811#discussion_r2648864098


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        
+        TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+            .bootstrapServers(clusterInstance.bootstrapServers())
+            .overrideRemoteLogMetadataManagerProps(overrideProps)
+            .build();
+        
+        try {
+            verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
+        } finally {
+            customRlmm.close();
+        }

Review Comment:
   ```suggestion
           try (TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
                   .bootstrapServers(clusterInstance.bootstrapServers())
                   .overrideRemoteLogMetadataManagerProps(overrideProps)
                   .build()) {
               verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
           }
   ```



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        
+        TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+            .bootstrapServers(clusterInstance.bootstrapServers())
+            .overrideRemoteLogMetadataManagerProps(overrideProps)
+            .build();
+        
+        try {
+            verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
+        } finally {
+            customRlmm.close();
+        }
+    }
+
+    private void 
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm, 
+                                                     short expectedMinIsr, 
+                                                     String valueDescription) 
+            throws ExecutionException, InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            String metadataTopic = 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+            
+            // Wait for the topic to be created
+            // RemoteLogMetadataManagerTestUtils uses 3 partitions for the 
metadata topic
+            clusterInstance.waitTopicCreation(metadataTopic, 3);
+            
+            // Verify the topic exists
+            assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+            
+            // Describe the topic configs to verify min.insync.replicas
+            ConfigResource topicResource = new 
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+            DescribeConfigsResult describeResult = 
admin.describeConfigs(List.of(topicResource));
+            Config config = describeResult.all().get().get(topicResource);
+            
+            assertTrue(config != null, "Topic config should not be null");

Review Comment:
   Please replace assertTrue -> assertNotNull



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        
+        TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+            .bootstrapServers(clusterInstance.bootstrapServers())
+            .overrideRemoteLogMetadataManagerProps(overrideProps)
+            .build();
+        
+        try {
+            verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
+        } finally {
+            customRlmm.close();
+        }
+    }
+
+    private void 
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm, 
+                                                     short expectedMinIsr, 
+                                                     String valueDescription) 

Review Comment:
   nit: align the parameter indentation.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");

Review Comment:
   ```suggestion
           verifyRemoteLogMetadataTopicMinIsr(remoteLogMetadataManager, (short) 
2, "default value");
   ```



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        
+        TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+            .bootstrapServers(clusterInstance.bootstrapServers())
+            .overrideRemoteLogMetadataManagerProps(overrideProps)
+            .build();
+        
+        try {
+            verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
+        } finally {
+            customRlmm.close();
+        }
+    }
+
+    private void 
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm, 
+                                                     short expectedMinIsr, 
+                                                     String valueDescription) 
+            throws ExecutionException, InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            String metadataTopic = 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+            
+            // Wait for the topic to be created
+            // RemoteLogMetadataManagerTestUtils uses 3 partitions for the 
metadata topic
+            clusterInstance.waitTopicCreation(metadataTopic, 3);

Review Comment:
   ```suggestion
               clusterInstance.waitTopicCreation(metadataTopic, 
RemoteLogMetadataManagerTestUtils.METADATA_TOPIC_PARTITIONS_COUNT);
   ```



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws 
IOException, InterruptedException
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException, 
InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+        verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        
+        TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+            .bootstrapServers(clusterInstance.bootstrapServers())
+            .overrideRemoteLogMetadataManagerProps(overrideProps)
+            .build();
+        
+        try {
+            verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr, 
"custom value");
+        } finally {
+            customRlmm.close();
+        }
+    }
+
+    private void 
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm, 
+                                                     short expectedMinIsr, 
+                                                     String valueDescription) 
+            throws ExecutionException, InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            String metadataTopic = 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+            
+            // Wait for the topic to be created
+            // RemoteLogMetadataManagerTestUtils uses 3 partitions for the 
metadata topic
+            clusterInstance.waitTopicCreation(metadataTopic, 3);
+            
+            // Verify the topic exists
+            assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+            
+            // Describe the topic configs to verify min.insync.replicas
+            ConfigResource topicResource = new 
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+            DescribeConfigsResult describeResult = 
admin.describeConfigs(List.of(topicResource));
+            Config config = describeResult.all().get().get(topicResource);
+            
+            assertTrue(config != null, "Topic config should not be null");
+            ConfigEntry minIsrEntry = 
config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
+            assertTrue(minIsrEntry != null, "min.insync.replicas config should 
exist");

Review Comment:
   Please replace assertTrue -> assertNotNull



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java:
##########
@@ -192,4 +194,20 @@ private void assertMaskedSensitiveConfigurations(String 
configString) {
         Arrays.stream(sensitiveConfigKeys)
                 .forEach(config -> assertTrue(configString.contains(config + 
"=(redacted)")));
     }
+
+    @Test
+    public void testDefaultMinIsr() {
+        Map<String, Object> props = createValidConfigProps(new HashMap<>(), 
new HashMap<>(), new HashMap<>());

Review Comment:
   Instead of `new HashMap<>`, can it be replaced with `Collections.emptyMap`
   
   
   ```suggestion
           Map<String, Object> props = 
createValidConfigProps(Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
   ```



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java:
##########
@@ -192,4 +194,20 @@ private void assertMaskedSensitiveConfigurations(String 
configString) {
         Arrays.stream(sensitiveConfigKeys)
                 .forEach(config -> assertTrue(configString.contains(config + 
"=(redacted)")));
     }
+
+    @Test
+    public void testDefaultMinIsr() {
+        Map<String, Object> props = createValidConfigProps(new HashMap<>(), 
new HashMap<>(), new HashMap<>());
+        TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
+        assertEquals(DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, 
rlmmConfig.metadataTopicMinIsr());
+    }
+
+    @Test
+    public void testCustomMinIsr() {
+        Map<String, Object> props = createValidConfigProps(new HashMap<>(), 
new HashMap<>(), new HashMap<>());

Review Comment:
   same as above: 
   
   
   ```suggestion
           Map<String, Object> props = 
createValidConfigProps(Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to