[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1022162537


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";

Review Comment:
   test now is fine on Jenkins. We have two unrelated failed test cases 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12577/31/testReport/
 
](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12577/31/testReport/
 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1021800352


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";

Review Comment:
   When I wrote this test the first time it failed with `ConfigException` with 
the following message `No resolvable bootstrap URLs given in bootstrap.servers` 
from `ClientUtils.parseAndValidateAddresses` locally and using the IPs fixed 
the issue.  It seems fine now when I use the old bootstrap servers again. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1022049731


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";

Review Comment:
   No worries, I still not sure why the test is fine locally when I run 
`./gradlew connect:mirror:unitTest` but not on Jenkins. I'll revert them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1022049731


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";

Review Comment:
   No worries, I still not sure why the test is fine locally when I run 
`./gradlew connect:mirror:unitTest` but not in Jenkins. I'll revert them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1021801927


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";
+String clusterBBoostrap = "127.0.0.3:9092, 127.0.0.4:9092";

Review Comment:
   fixed the typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1021801503


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -16,23 +16,37 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Map;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Collections;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.singleton;
+
 /** Internal utility methods. */
 final class MirrorUtils {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorCheckpointTask.class);

Review Comment:
   Done sorry for this rookie mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-14 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1021800352


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -77,20 +79,23 @@ public void testReplicationConfigProperties() {
 
 @Test
 public void testClientConfigProperties() {
+String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092";

Review Comment:
   When I wrote this test the first time it failed with `ConfigException` with 
the following message `No resolvable bootstrap URLs given in one:9092,two:9092` 
from `ClientUtils.parseAndValidateAddresses` locally and using the IPs fixed 
the issue.  It seems fine now when I use the old bootstrap servers again. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-08 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1015481020


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {
+try {
+return Utils.newParameterizedInstance(

Review Comment:
   Yes, however, based on my understanding `Configurable` class, 
   1. `Configurable` usually has no logic in the constructor and doesn't have 
any parameters passed to the constructor either. Instead, configurations are 
passed to `Configurable.configure(Map props)` which 
`getConfiguredInstance` calls after initialize an instance of `Configurable`.
   
   2. Subclasses of `Configurable` aren't forced to implement `configure` and 
also aren't forced to call `super.configure` as part of the implementation of 
`configure`. 

   I can change it to `Configurable` if you prefer this, but my point is that 
`Configurable` doesn't force the subclasses to inheart the initialization or 
configure logic as they are part of `configure` and not the constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-07 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1015617856


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   The test now extends `MirrorConnectorsIntegrationBaseTest`. 
   I also fixed the following tests
   - 
`Iden

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-07 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1015481020


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {
+try {
+return Utils.newParameterizedInstance(

Review Comment:
   Yes, however, based on my understanding `Configurable` class, 
   1. `Configurable` usually has no logic in the constructor and doesn't have 
any parameters passed to the constructor either. Instead, configurations are 
passed to `Configurable.configure(Map props)` which 
`getConfiguredInstance` calls after initialize an instance of `Configurable`.
   
   2. Subclasses of `Configurable` aren't forced to implement `configure` and 
also aren't forced to call `super.configure` as part of the implementation of 
`configure`. 

   I can change it to `Configurable` if you prefer this, but my point is that 
`Configurable` doesn't force the subclasses to inheart the initialization or 
configure logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-11-02 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1012326106


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   yes this would be better than raising another pr to use 
`MirrorConnectorsIntegrationBaseTest` for the pr te

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-31 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1009669418


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   @mimaison I created this jira 
[https://issues.apache.org/jira/browse/KAFKA-14344](https://issues.apache.org

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008151429


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -82,15 +84,17 @@ public void testClientConfigProperties() {
 "config.providers", "fake",
 "config.providers.fake.class", FakeConfigProvider.class.getName(),
 "replication.policy.separator", "__",
-"ssl.truststore.password", "secret1",
 "ssl.key.password", "${fake:secret:password}",  // resolves to 
"secret2"
-"security.protocol", "SSL", 
-"a.security.protocol", "PLAINTEXT", 
+"security.protocol", "SSL",
+"a.ssl.truststore.password", "secret1",

Review Comment:
   removed `ssl.truststore.password`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008151052


##
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * {@code ForwardingAdmin} is the default value of {@code 
forwarding.admin.class} in MM2.

Review Comment:
   edited it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008149257


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -56,7 +58,11 @@ public class MirrorClientConfig extends AbstractConfig {
 private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator 
used in remote topic naming convention.";
 public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
 DefaultReplicationPolicy.SEPARATOR_DEFAULT;
-
+
+public static final String FORWARDING_ADMIN_CLASS = 
"forwarding.admin.class";
+public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which 
extends ForwardingAdmin to define custom cluster resource management (topics, 
configs, etc). " +
+"The class must have a contractor that accept configuration 
(Map config) to configure KafkaAdminClient and any other needed 
clients.";

Review Comment:
   fixed the typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008147190


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   Yeah, I think this will be better. I can open a jira ticket and raise 
another PR to have an easier way to s

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008147190


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   Yeah, I think this will be better. I can open a Jira and raise another PR to 
have an easier way to set up `

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008074680


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -82,15 +84,17 @@ public void testClientConfigProperties() {
 "config.providers", "fake",
 "config.providers.fake.class", FakeConfigProvider.class.getName(),
 "replication.policy.separator", "__",
-"ssl.truststore.password", "secret1",
 "ssl.key.password", "${fake:secret:password}",  // resolves to 
"secret2"
-"security.protocol", "SSL", 
-"a.security.protocol", "PLAINTEXT", 
+"security.protocol", "SSL",
+"a.ssl.truststore.password", "secret1",

Review Comment:
   I can remove the ` "ssl.truststore.password", "secret1", ` and any test for 
this config. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008071035


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -82,15 +84,17 @@ public void testClientConfigProperties() {
 "config.providers", "fake",
 "config.providers.fake.class", FakeConfigProvider.class.getName(),
 "replication.policy.separator", "__",
-"ssl.truststore.password", "secret1",
 "ssl.key.password", "${fake:secret:password}",  // resolves to 
"secret2"
-"security.protocol", "SSL", 
-"a.security.protocol", "PLAINTEXT", 
+"security.protocol", "SSL",
+"a.ssl.truststore.password", "secret1",

Review Comment:
   The code already had `ssl.truststore.password` with `PLAINTEXT` before for 
both clusters `a` and `b`.   
https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L85
   
   ```
   "ssl.truststore.password", "secret1",
   "ssl.key.password", "${fake:secret:password}",  // resolves to "secret2"
   "security.protocol", "SSL", 
   "a.security.protocol", "PLAINTEXT", 
   ```
   This set of configs is wrong anyway as it set `truststore.password` without 
a truststore and as you mentioned it has protocol as `PLAINTEXT`. And If we 
tried to initialise any client with these configs it fail with 
`org.apache.kafka.common.errors.InvalidConfigurationException: SSL trust store 
is not specified, but trust store password is specified.`
   
   However, `testClientConfigProperties` isn't testing if a set of configs are 
valid or not instead the test case is testing if the MM2 is loading configs 
prefixed by cluster alias. 
   
   On the other side my test is trying to check if 
`bClientConfig.forwardingAdmin(bClientConfig.adminConfig())` is instance of 
`FakeForwardingAdmin` to do this I can't use  the original configs  that 
set`"ssl.truststore.password", "secret1",` to both clusters. So I just limited 
it to cluster `a` and kept the rest of the assertions in the test case 
https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L112
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008071035


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##
@@ -82,15 +84,17 @@ public void testClientConfigProperties() {
 "config.providers", "fake",
 "config.providers.fake.class", FakeConfigProvider.class.getName(),
 "replication.policy.separator", "__",
-"ssl.truststore.password", "secret1",
 "ssl.key.password", "${fake:secret:password}",  // resolves to 
"secret2"
-"security.protocol", "SSL", 
-"a.security.protocol", "PLAINTEXT", 
+"security.protocol", "SSL",
+"a.ssl.truststore.password", "secret1",

Review Comment:
   The code already had `ssl.truststore.password` with `PLAINTEXT` before for 
both clusters `a` and `b`.   
https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L85
   
   ```
   "ssl.truststore.password", "secret1",
   "ssl.key.password", "${fake:secret:password}",  // resolves to "secret2"
   "security.protocol", "SSL", 
   "a.security.protocol", "PLAINTEXT", 
   ```
   This set of configs is wrong anyway as it set `truststore.password` without 
a truststore and as you mentioned it has protocol as `PLAINTEXT`. And If we 
tried to initialise any client with these configs it fail with 
`org.apache.kafka.common.errors.InvalidConfigurationException: SSL trust store 
is not specified, but trust store password is specified.`
   
   However, `testClientConfigProperties` isn't testing if a set of configs are 
valid or not instead the test case is testing if the cluster's clients are 
configured using any config prefixed by cluster alias. 
   
   On the other side my test is trying to check if 
`bClientConfig.forwardingAdmin(bClientConfig.adminConfig())` is instance of 
`FakeForwardingAdmin` to do this I can't use  the original configs  that 
set`"ssl.truststore.password", "secret1",` to both clusters. So I just limited 
it to cluster `a` and kept the rest of the assertions in the test case 
https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L112
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-28 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1008047026


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config;
+import static 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {

Review Comment:
   Setting up `EmbededCluster` with `AclAuthorizer` to test ACLs sync was a lot 
of work, this is why I separat

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007434322


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {
+protected static final int NUM_RECORDS_PER_PARTITION = 1;
+protected static final int NUM_PARTITIONS = 1;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500L);
+protected static final String PRIMARY_CLUSTER_ALIAS = "pr

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007424805


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
+import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS;
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests MM2 is using provided ForwardingAdmin to create/alter topics, 
partitions and ACLs.
+ */
+@Tag("integration")
+public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest {
+protected static final int NUM_RECORDS_PER_PARTITION = 1;
+protected static final int NUM_PARTITIONS = 1;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final Duration CONSUMER_POLL_TIMEOUT_MS = 
Duration.ofMillis(500L);
+protected static final String PRIMARY_CLUSTER_ALIAS = "pr

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*

Review Comment:
   Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now 
successed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##
@@ -0,0 +1,596 @@
+/*

Review Comment:
   Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now be 
green



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-27 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r1007411497


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java:
##
@@ -0,0 +1,15 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   Added the license 



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -0,0 +1,117 @@
+package org.apache.kafka.connect.mirror.clients.admin;

Review Comment:
   Added the license



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-18 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r998207034


##
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * {@code ForwardingAdmin} is the default value of {@code 
forwarding.admin.class} in MM2.
+ * Users who wish to customize the MM2 behaviour for the creation of topics 
and access control lists can extend this 
+  * class without needing to provide a whole implementation of {@code Admin}.
+ * The class must have a constructor with signature {@code (Map config)} for configuring
+ * a decorated {@link KafkaAdminClient} and any other clients needed for 
external resource management.
+ */
+public class ForwardingAdmin implements Admin {
+private final Admin delegate;
+
+public ForwardingAdmin(Map configs) {
+this.delegate = Admin.create(configs);
+}
+
+@Override
+public void close(Duration timeout) {
+delegate.close();

Review Comment:
   yes, good catch. Update it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-18 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r998206276


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -56,7 +58,10 @@ public class MirrorClientConfig extends AbstractConfig {
 private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator 
used in remote topic naming convention.";
 public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
 DefaultReplicationPolicy.SEPARATOR_DEFAULT;
-
+
+public static final String FORWARDING_ADMIN_CLASS = 
"forwarding.admin.class";
+public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which 
extends ForwardingAdmin to define custom cluster resource management (topics, 
configs, etc).";

Review Comment:
   update the description 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-13 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r994785155


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -92,6 +94,9 @@ public class MirrorConnectorConfig extends AbstractConfig {
 private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator 
used in remote topic naming convention.";
 public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
 MirrorClientConfig.REPLICATION_POLICY_SEPARATOR_DEFAULT;
+public static final String FORWARDING_ADMIN_CLASS = 
MirrorClientConfig.FORWARDING_ADMIN_CLASS;

Review Comment:
   I'm planning to raise another KIP for Connect and see how the community 
feels about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-11 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426


##
clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+
+public class FakeForwardingAdmin extends ForwardingAdmin {

Review Comment:
   `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in 
`clients/admin` but I don't mind moving them to connect/mirror at this point. 
they are in `clients/admin` based on some suggestions on the KIP discussion on 
the mailing list to have forwarding admin pluggable in other clients in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-11 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426


##
clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+
+public class FakeForwardingAdmin extends ForwardingAdmin {

Review Comment:
   hmm, `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in 
`clients/admin` but I don't mind moving them to connect/mirror at this point. 
they are in `clients/admin` based on some suggestions on the KIP discussion on 
the mailing list to have forwarding admin pluggable in other clients in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991579817


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -436,6 +441,17 @@ ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {
+try {
+return Utils.newParameterizedInstance(
+getClass(FORWARDING_ADMIN_CLASS).getName(), 
(Class>) (Class) Map.class, config

Review Comment:
   I added `rawtypes`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991577963


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -79,8 +80,8 @@ public class MirrorSourceConnector extends SourceConnector {
 private List knownTargetTopicPartitions = 
Collections.emptyList();
 private ReplicationPolicy replicationPolicy;
 private int replicationFactor;
-private AdminClient sourceAdminClient;
-private AdminClient targetAdminClient;
+private ForwardingAdmin sourceAdminClient;
+private ForwardingAdmin targetAdminClient;

Review Comment:
   Same as the others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991569076


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java:
##
@@ -56,7 +56,7 @@
 public class MirrorClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(MirrorClient.class);
 
-private final AdminClient adminClient;
+private final ForwardingAdmin adminClient;

Review Comment:
   Also, the original MM2 codebase used to use Admin in the same places and 
AdminClient in others. So `Admin` will be better for consistency.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -47,7 +47,7 @@ public class MirrorCheckpointConnector extends 
SourceConnector {
 private Scheduler scheduler;
 private MirrorConnectorConfig config;
 private GroupFilter groupFilter;
-private AdminClient sourceAdminClient;
+private ForwardingAdmin sourceAdminClient;

Review Comment:
   Same as the other comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991567235


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java:
##
@@ -56,7 +56,7 @@
 public class MirrorClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(MirrorClient.class);
 
-private final AdminClient adminClient;
+private final ForwardingAdmin adminClient;

Review Comment:
   I changed the type to `Admin` instead of `AdminClient` as `AdminClient` and 
`ForwardingAdmin` both implement `Admin` interface. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991567049


##
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class ForwardingAdmin implements Admin {
+private final Admin delegate;
+
+public ForwardingAdmin(Map configs) {
+this.delegate = AdminClient.create(configs);

Review Comment:
   Switched it to `Admin.create` instead of `AdminClient`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991566688


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);
+}
+if (cause instanceof UnsupportedVersionException) {
+log.debug("Unable to create topic(s) '{}' since the brokers do 
not support the CreateTopics API." +
+" Falling back to assume topic exist or will 
be auto-created by the broker.",
+topicName);
+}
+if (cause instanceof ClusterAuthorizationException) {
+log.debug("Not authorized to create topic '{}' upon the 
brokers." +
+" Falling back to assume topic(s) exist or 
will be auto-created by the broker.",
+topicName);
+}
+if (cause instanceof InvalidConfigurationException) {
+throw new ConnectException("Unable to create topic '" + 
topicName + "': " + cause.getMessage(),
+cause);
+}
+if (cause instanceof TimeoutException) {
+// Timed out waiting for the operation to complete

Review Comment:
   Same here as well, this comment is added in `TopicAdmin.createOrFindTopics` 
so I kept it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991566853


##
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class ForwardingAdmin implements Admin {

Review Comment:
   Added Javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991566108


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);
+}
+if (cause instanceof UnsupportedVersionException) {
+log.debug("Unable to create topic(s) '{}' since the brokers do 
not support the CreateTopics API." +
+" Falling back to assume topic exist or will 
be auto-created by the broker.",
+topicName);
+}
+if (cause instanceof ClusterAuthorizationException) {
+log.debug("Not authorized to create topic '{}' upon the 
brokers." +
+" Falling back to assume topic(s) exist or 
will be auto-created by the broker.",
+topicName);
+}
+if (cause instanceof InvalidConfigurationException) {

Review Comment:
   This is the same error message in `TopicAdmin.createOrFindTopics` as I am 
trying to keep the same error handling behaviour and logs for customers while 
switching to use ForwardingAdmin instead of `Admin`, `AdminClient` or 
`TopicAdmin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991565995


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);

Review Comment:
   fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991565793


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);

Review Comment:
   You're right I kept it as `TopicAdmin.createOrFindTopics()` is using 
`validateOnly` and am trying to keep the same logic while am switching the code 
to use ForwardingAdmin instead. 



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);

Review Comment:
   You're right I kept it as `TopicAdmin.createOrFindTopics()` is using 
`validateOnly` and am trying to keep the same logic while I'm switching the 
code to use ForwardingAdmin instead. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991564993


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);
+}
+if (cause instanceof UnsupportedVersionException) {
+log.debug("Unable to create topic(s) '{}' since the brokers do 
not support the CreateTopics API." +
+" Falling back to assume topic exist or will 
be auto-created by the broker.",
+topicName);
+}
+if (cause instanceof ClusterAuthorizationException) {
+log.debug("Not authorized to create topic '{}' upon the 
brokers." +

Review Comment:
   Removed it.  Was copied from `TopicAdmin`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991564622


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);
+}
+if (cause instanceof UnsupportedVersionException) {
+log.debug("Unable to create topic(s) '{}' since the brokers do 
not support the CreateTopics API." +

Review Comment:
   fixed.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);
+}
+if (cause instanceof UnsupportedVersionException) {
+log.debug("Unable to create topic(s) '{}' since the brokers do 
not support the CreateTopics API." +
+" Falling back to assume topic exist or will 
be auto-created by the broker.",

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991564465


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) {
 return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
 }
 
-static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, Map adminProps) {
+static void createCompactedTopic(String topicName, short partitions, short 
replicationFactor, ForwardingAdmin forwardingAdmin) {
 NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
 compacted().
 partitions(partitions).
 replicationFactor(replicationFactor).
 build();
 
-try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-admin.createTopics(topicDescription);
+CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+try {
+forwardingAdmin.createTopics(singleton(topicDescription), 
args).values().get(topicName).get();
+log.info("Created topic", topicName);
+} catch (InterruptedException e) {
+Thread.interrupted();
+throw new ConnectException("Interrupted while attempting to 
create/find topic '" + topicName + "'", e);
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicExistsException) {
+log.debug("Found existing topic '{}'", topicName);

Review Comment:
   updated the log message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991564315


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {
+try {
+return Utils.newParameterizedInstance(

Review Comment:
   `getConfiguredInstance` works only with `Configurable` the problem with 
using `Configurable` for this case in my opinion is that it doesn't force 
inheriting the superclass initializing behaviour.
   In the case, the behaviour we want all extended classes to inherit is the 
initializing of the `delegate = Admin.create(config)` so I used 
`Utils.newParameterizedInstance`



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -17,6 +17,8 @@
 package org.apache.kafka.connect.mirror;
 
 import java.util.Map.Entry;
+
+import org.apache.kafka.clients.admin.ForwardingAdmin;

Review Comment:
   Fixed this to `Admin` instead of `ForwardingAdmin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-10 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r991564134


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -436,6 +441,17 @@ ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {

Review Comment:
   removed the prefix



##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() {
 return getConfiguredInstance(REPLICATION_POLICY_CLASS, 
ReplicationPolicy.class);
 }
 
+@SuppressWarnings("unchecked")
+ForwardingAdmin getForwardingAdmin(Map config) {

Review Comment:
   removed the prefix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org