[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1022162537 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; Review Comment: test now is fine on Jenkins. We have two unrelated failed test cases [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12577/31/testReport/ ](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12577/31/testReport/ ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1021800352 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; Review Comment: When I wrote this test the first time it failed with `ConfigException` with the following message `No resolvable bootstrap URLs given in bootstrap.servers` from `ClientUtils.parseAndValidateAddresses` locally and using the IPs fixed the issue. It seems fine now when I use the old bootstrap servers again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1022049731 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; Review Comment: No worries, I still not sure why the test is fine locally when I run `./gradlew connect:mirror:unitTest` but not on Jenkins. I'll revert them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1022049731 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; Review Comment: No worries, I still not sure why the test is fine locally when I run `./gradlew connect:mirror:unitTest` but not in Jenkins. I'll revert them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1021801927 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; +String clusterBBoostrap = "127.0.0.3:9092, 127.0.0.4:9092"; Review Comment: fixed the typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1021801503 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -16,23 +16,37 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; +import static java.util.Collections.singleton; + /** Internal utility methods. */ final class MirrorUtils { +private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class); Review Comment: Done sorry for this rookie mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1021800352 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -77,20 +79,23 @@ public void testReplicationConfigProperties() { @Test public void testClientConfigProperties() { +String clusterABBootstrap = "127.0.0.1:9092, 127.0.0.2:9092"; Review Comment: When I wrote this test the first time it failed with `ConfigException` with the following message `No resolvable bootstrap URLs given in one:9092,two:9092` from `ClientUtils.parseAndValidateAddresses` locally and using the IPs fixed the issue. It seems fine now when I use the old bootstrap servers again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1015481020 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { +try { +return Utils.newParameterizedInstance( Review Comment: Yes, however, based on my understanding `Configurable` class, 1. `Configurable` usually has no logic in the constructor and doesn't have any parameters passed to the constructor either. Instead, configurations are passed to `Configurable.configure(Map props)` which `getConfiguredInstance` calls after initialize an instance of `Configurable`. 2. Subclasses of `Configurable` aren't forced to implement `configure` and also aren't forced to call `super.configure` as part of the implementation of `configure`. I can change it to `Configurable` if you prefer this, but my point is that `Configurable` doesn't force the subclasses to inheart the initialization or configure logic as they are part of `configure` and not the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1015617856 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: The test now extends `MirrorConnectorsIntegrationBaseTest`. I also fixed the following tests - `Iden
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1015481020 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { +try { +return Utils.newParameterizedInstance( Review Comment: Yes, however, based on my understanding `Configurable` class, 1. `Configurable` usually has no logic in the constructor and doesn't have any parameters passed to the constructor either. Instead, configurations are passed to `Configurable.configure(Map props)` which `getConfiguredInstance` calls after initialize an instance of `Configurable`. 2. Subclasses of `Configurable` aren't forced to implement `configure` and also aren't forced to call `super.configure` as part of the implementation of `configure`. I can change it to `Configurable` if you prefer this, but my point is that `Configurable` doesn't force the subclasses to inheart the initialization or configure logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1012326106 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: yes this would be better than raising another pr to use `MirrorConnectorsIntegrationBaseTest` for the pr te
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1009669418 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: @mimaison I created this jira [https://issues.apache.org/jira/browse/KAFKA-14344](https://issues.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008151429 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -82,15 +84,17 @@ public void testClientConfigProperties() { "config.providers", "fake", "config.providers.fake.class", FakeConfigProvider.class.getName(), "replication.policy.separator", "__", -"ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" -"security.protocol", "SSL", -"a.security.protocol", "PLAINTEXT", +"security.protocol", "SSL", +"a.ssl.truststore.password", "secret1", Review Comment: removed `ssl.truststore.password` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008151052 ## clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java: ## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaFilter; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * {@code ForwardingAdmin} is the default value of {@code forwarding.admin.class} in MM2. Review Comment: edited it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008149257 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -56,7 +58,11 @@ public class MirrorClientConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; - + +public static final String FORWARDING_ADMIN_CLASS = "forwarding.admin.class"; +public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which extends ForwardingAdmin to define custom cluster resource management (topics, configs, etc). " + +"The class must have a contractor that accept configuration (Map config) to configure KafkaAdminClient and any other needed clients."; Review Comment: fixed the typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008147190 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: Yeah, I think this will be better. I can open a jira ticket and raise another PR to have an easier way to s
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008147190 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: Yeah, I think this will be better. I can open a Jira and raise another PR to have an easier way to set up `
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008074680 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -82,15 +84,17 @@ public void testClientConfigProperties() { "config.providers", "fake", "config.providers.fake.class", FakeConfigProvider.class.getName(), "replication.policy.separator", "__", -"ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" -"security.protocol", "SSL", -"a.security.protocol", "PLAINTEXT", +"security.protocol", "SSL", +"a.ssl.truststore.password", "secret1", Review Comment: I can remove the ` "ssl.truststore.password", "secret1", ` and any test for this config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008071035 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -82,15 +84,17 @@ public void testClientConfigProperties() { "config.providers", "fake", "config.providers.fake.class", FakeConfigProvider.class.getName(), "replication.policy.separator", "__", -"ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" -"security.protocol", "SSL", -"a.security.protocol", "PLAINTEXT", +"security.protocol", "SSL", +"a.ssl.truststore.password", "secret1", Review Comment: The code already had `ssl.truststore.password` with `PLAINTEXT` before for both clusters `a` and `b`. https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L85 ``` "ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" "security.protocol", "SSL", "a.security.protocol", "PLAINTEXT", ``` This set of configs is wrong anyway as it set `truststore.password` without a truststore and as you mentioned it has protocol as `PLAINTEXT`. And If we tried to initialise any client with these configs it fail with `org.apache.kafka.common.errors.InvalidConfigurationException: SSL trust store is not specified, but trust store password is specified.` However, `testClientConfigProperties` isn't testing if a set of configs are valid or not instead the test case is testing if the MM2 is loading configs prefixed by cluster alias. On the other side my test is trying to check if `bClientConfig.forwardingAdmin(bClientConfig.adminConfig())` is instance of `FakeForwardingAdmin` to do this I can't use the original configs that set`"ssl.truststore.password", "secret1",` to both clusters. So I just limited it to cluster `a` and kept the rest of the assertions in the test case https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008071035 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ## @@ -82,15 +84,17 @@ public void testClientConfigProperties() { "config.providers", "fake", "config.providers.fake.class", FakeConfigProvider.class.getName(), "replication.policy.separator", "__", -"ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" -"security.protocol", "SSL", -"a.security.protocol", "PLAINTEXT", +"security.protocol", "SSL", +"a.ssl.truststore.password", "secret1", Review Comment: The code already had `ssl.truststore.password` with `PLAINTEXT` before for both clusters `a` and `b`. https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L85 ``` "ssl.truststore.password", "secret1", "ssl.key.password", "${fake:secret:password}", // resolves to "secret2" "security.protocol", "SSL", "a.security.protocol", "PLAINTEXT", ``` This set of configs is wrong anyway as it set `truststore.password` without a truststore and as you mentioned it has protocol as `PLAINTEXT`. And If we tried to initialise any client with these configs it fail with `org.apache.kafka.common.errors.InvalidConfigurationException: SSL trust store is not specified, but trust store password is specified.` However, `testClientConfigProperties` isn't testing if a set of configs are valid or not instead the test case is testing if the cluster's clients are configured using any config prefixed by cluster alias. On the other side my test is trying to check if `bClientConfig.forwardingAdmin(bClientConfig.adminConfig())` is instance of `FakeForwardingAdmin` to do this I can't use the original configs that set`"ssl.truststore.password", "secret1",` to both clusters. So I just limited it to cluster `a` and kept the rest of the assertions in the test case https://github.com/apache/kafka/blob/6ab4d047d563e0fe42a7c0ed6f10ddecda135595/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java#L112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1008047026 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.basicMM2Config; +import static org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { Review Comment: Setting up `EmbededCluster` with `AclAuthorizer` to test ACLs sync was a lot of work, this is why I separat
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007434322 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { +protected static final int NUM_RECORDS_PER_PARTITION = 1; +protected static final int NUM_PARTITIONS = 1; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L); +protected static final String PRIMARY_CLUSTER_ALIAS = "pr
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007424805 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; +import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.FORWARDING_ADMIN_CLASS; +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests MM2 is using provided ForwardingAdmin to create/alter topics, partitions and ACLs. + */ +@Tag("integration") +public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest { +protected static final int NUM_RECORDS_PER_PARTITION = 1; +protected static final int NUM_PARTITIONS = 1; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L); +protected static final String PRIMARY_CLUSTER_ALIAS = "pr
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* Review Comment: Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now successed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007412233 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ## @@ -0,0 +1,596 @@ +/* Review Comment: Fixed the checkstyle `./gradlew checkstyleMain checkstyleTest` should now be green -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1007411497 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java: ## @@ -0,0 +1,15 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: Added the license ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java: ## @@ -0,0 +1,117 @@ +package org.apache.kafka.connect.mirror.clients.admin; Review Comment: Added the license -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r998207034 ## clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java: ## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaFilter; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * {@code ForwardingAdmin} is the default value of {@code forwarding.admin.class} in MM2. + * Users who wish to customize the MM2 behaviour for the creation of topics and access control lists can extend this + * class without needing to provide a whole implementation of {@code Admin}. + * The class must have a constructor with signature {@code (Map config)} for configuring + * a decorated {@link KafkaAdminClient} and any other clients needed for external resource management. + */ +public class ForwardingAdmin implements Admin { +private final Admin delegate; + +public ForwardingAdmin(Map configs) { +this.delegate = Admin.create(configs); +} + +@Override +public void close(Duration timeout) { +delegate.close(); Review Comment: yes, good catch. Update it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r998206276 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -56,7 +58,10 @@ public class MirrorClientConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = DefaultReplicationPolicy.SEPARATOR_DEFAULT; - + +public static final String FORWARDING_ADMIN_CLASS = "forwarding.admin.class"; +public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which extends ForwardingAdmin to define custom cluster resource management (topics, configs, etc)."; Review Comment: update the description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r994785155 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -92,6 +94,9 @@ public class MirrorConnectorConfig extends AbstractConfig { private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR_DEFAULT; +public static final String FORWARDING_ADMIN_CLASS = MirrorClientConfig.FORWARDING_ADMIN_CLASS; Review Comment: I'm planning to raise another KIP for Connect and see how the community feels about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426 ## clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Map; + +public class FakeForwardingAdmin extends ForwardingAdmin { Review Comment: `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in `clients/admin` but I don't mind moving them to connect/mirror at this point. they are in `clients/admin` based on some suggestions on the KIP discussion on the mailing list to have forwarding admin pluggable in other clients in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426 ## clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Map; + +public class FakeForwardingAdmin extends ForwardingAdmin { Review Comment: hmm, `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in `clients/admin` but I don't mind moving them to connect/mirror at this point. they are in `clients/admin` based on some suggestions on the KIP discussion on the mailing list to have forwarding admin pluggable in other clients in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991579817 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -436,6 +441,17 @@ ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { +try { +return Utils.newParameterizedInstance( +getClass(FORWARDING_ADMIN_CLASS).getName(), (Class>) (Class) Map.class, config Review Comment: I added `rawtypes`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991577963 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -79,8 +80,8 @@ public class MirrorSourceConnector extends SourceConnector { private List knownTargetTopicPartitions = Collections.emptyList(); private ReplicationPolicy replicationPolicy; private int replicationFactor; -private AdminClient sourceAdminClient; -private AdminClient targetAdminClient; +private ForwardingAdmin sourceAdminClient; +private ForwardingAdmin targetAdminClient; Review Comment: Same as the others. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991569076 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java: ## @@ -56,7 +56,7 @@ public class MirrorClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(MirrorClient.class); -private final AdminClient adminClient; +private final ForwardingAdmin adminClient; Review Comment: Also, the original MM2 codebase used to use Admin in the same places and AdminClient in others. So `Admin` will be better for consistency. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -47,7 +47,7 @@ public class MirrorCheckpointConnector extends SourceConnector { private Scheduler scheduler; private MirrorConnectorConfig config; private GroupFilter groupFilter; -private AdminClient sourceAdminClient; +private ForwardingAdmin sourceAdminClient; Review Comment: Same as the other comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991567235 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java: ## @@ -56,7 +56,7 @@ public class MirrorClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(MirrorClient.class); -private final AdminClient adminClient; +private final ForwardingAdmin adminClient; Review Comment: I changed the type to `Admin` instead of `AdminClient` as `AdminClient` and `ForwardingAdmin` both implement `Admin` interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991567049 ## clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaFilter; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ForwardingAdmin implements Admin { +private final Admin delegate; + +public ForwardingAdmin(Map configs) { +this.delegate = AdminClient.create(configs); Review Comment: Switched it to `Admin.create` instead of `AdminClient` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991566688 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); +} +if (cause instanceof UnsupportedVersionException) { +log.debug("Unable to create topic(s) '{}' since the brokers do not support the CreateTopics API." + +" Falling back to assume topic exist or will be auto-created by the broker.", +topicName); +} +if (cause instanceof ClusterAuthorizationException) { +log.debug("Not authorized to create topic '{}' upon the brokers." + +" Falling back to assume topic(s) exist or will be auto-created by the broker.", +topicName); +} +if (cause instanceof InvalidConfigurationException) { +throw new ConnectException("Unable to create topic '" + topicName + "': " + cause.getMessage(), +cause); +} +if (cause instanceof TimeoutException) { +// Timed out waiting for the operation to complete Review Comment: Same here as well, this comment is added in `TopicAdmin.createOrFindTopics` so I kept it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991566853 ## clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaFilter; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ForwardingAdmin implements Admin { Review Comment: Added Javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991566108 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); +} +if (cause instanceof UnsupportedVersionException) { +log.debug("Unable to create topic(s) '{}' since the brokers do not support the CreateTopics API." + +" Falling back to assume topic exist or will be auto-created by the broker.", +topicName); +} +if (cause instanceof ClusterAuthorizationException) { +log.debug("Not authorized to create topic '{}' upon the brokers." + +" Falling back to assume topic(s) exist or will be auto-created by the broker.", +topicName); +} +if (cause instanceof InvalidConfigurationException) { Review Comment: This is the same error message in `TopicAdmin.createOrFindTopics` as I am trying to keep the same error handling behaviour and logs for customers while switching to use ForwardingAdmin instead of `Admin`, `AdminClient` or `TopicAdmin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991565995 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); Review Comment: fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991565793 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); Review Comment: You're right I kept it as `TopicAdmin.createOrFindTopics()` is using `validateOnly` and am trying to keep the same logic while am switching the code to use ForwardingAdmin instead. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); Review Comment: You're right I kept it as `TopicAdmin.createOrFindTopics()` is using `validateOnly` and am trying to keep the same logic while I'm switching the code to use ForwardingAdmin instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991564993 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); +} +if (cause instanceof UnsupportedVersionException) { +log.debug("Unable to create topic(s) '{}' since the brokers do not support the CreateTopics API." + +" Falling back to assume topic exist or will be auto-created by the broker.", +topicName); +} +if (cause instanceof ClusterAuthorizationException) { +log.debug("Not authorized to create topic '{}' upon the brokers." + Review Comment: Removed it. Was copied from `TopicAdmin`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991564622 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); +} +if (cause instanceof UnsupportedVersionException) { +log.debug("Unable to create topic(s) '{}' since the brokers do not support the CreateTopics API." + Review Comment: fixed. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); +} +if (cause instanceof UnsupportedVersionException) { +log.debug("Unable to create topic(s) '{}' since the brokers do not support the CreateTopics API." + +" Falling back to assume topic exist or will be auto-created by the broker.", Review Comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991564465 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -98,19 +112,51 @@ static Pattern compilePatternList(String fields) { return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*"))); } -static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map adminProps) { +static void createCompactedTopic(String topicName, short partitions, short replicationFactor, ForwardingAdmin forwardingAdmin) { NewTopic topicDescription = TopicAdmin.defineTopic(topicName). compacted(). partitions(partitions). replicationFactor(replicationFactor). build(); -try (TopicAdmin admin = new TopicAdmin(adminProps)) { -admin.createTopics(topicDescription); +CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); +try { +forwardingAdmin.createTopics(singleton(topicDescription), args).values().get(topicName).get(); +log.info("Created topic", topicName); +} catch (InterruptedException e) { +Thread.interrupted(); +throw new ConnectException("Interrupted while attempting to create/find topic '" + topicName + "'", e); +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicExistsException) { +log.debug("Found existing topic '{}'", topicName); Review Comment: updated the log message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991564315 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { +try { +return Utils.newParameterizedInstance( Review Comment: `getConfiguredInstance` works only with `Configurable` the problem with using `Configurable` for this case in my opinion is that it doesn't force inheriting the superclass initializing behaviour. In the case, the behaviour we want all extended classes to inherit is the initializing of the `delegate = Admin.create(config)` so I used `Utils.newParameterizedInstance` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -17,6 +17,8 @@ package org.apache.kafka.connect.mirror; import java.util.Map.Entry; + +import org.apache.kafka.clients.admin.ForwardingAdmin; Review Comment: Fixed this to `Admin` instead of `ForwardingAdmin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r991564134 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -436,6 +441,17 @@ ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { Review Comment: removed the prefix ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { Review Comment: removed the prefix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org