C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1143650777
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +326,78 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + public void testIncrementalAlterConfigsRequested() throws Exception { + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(2)).syncTopicConfigs(); Review Comment: There's no need to verify these invocations since we make both of them in this test case ourselves. (This applies to other test cases as well, such as `testIncrementalAlterConfigsRequired`.) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -514,6 +541,40 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) { })); } + // visible for testing + @SuppressWarnings("deprecation") Review Comment: This part can be removed now (🎉) ```suggestion ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) + || (x.isDefault() && shouldReplicateSourceDefault(x.name()))) + || (!x.isDefault() && useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG))) + .filter(x -> !x.isReadOnly() && !x.isSensitive()) + .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) + .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) Review Comment: This double-checks default values, which I don't believe is correct (we should rely solely on `ConfigPropertyFilter::shouldReplicateSourceDefault` for those). Perhaps all of the logic can be moved to the `shouldReplicateTopicConfigurationProperty` method, including determining whether the property is a default or not (and how to handle that accordingly)? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -30,6 +30,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; + public static final String USE_DEFAULTS_FROM = "use.defaults.from"; + private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults to use when syncing topic configurations."; + private static final String USE_DEFAULTS_FROM_DEFAULT = "target"; private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes " + "that should not be replicated."; Review Comment: Can we add a note that this will also apply to properties with default values if `use.defaults.from` is set to `source`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -73,6 +75,19 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX; private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync."; public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60; + @Deprecated + public static final String USE_INCREMENTAL_ALTER_CONFIG = "use.incremental.alter.configs"; + private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " + + "The valid values are \"requested\", \"required\" and \"never\". " + + "By default, set to \"requested\", which means the IncrementalAlterConfigs API is being used for syncing topic configurations " + + "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " + + "If explicitly set to \"required\", the IncrementalAlterConfigs API is used without the fallback logic and +" + + "if it receives an error from an incompatible broker, the connector will fail." + + "If explicitly set to \"never\", the AlterConfig is always used." + + "This setting will be removed and the behaviour of \"required\" will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0"; Review Comment: Nit: it might be easier/cleaner to use single quotes around property names instead of double quotes, so that we don't have to use escape sequences. ```suggestion "The valid values are 'requested', 'required' and 'never'. " + "By default, set to 'requested', which means the IncrementalAlterConfigs API is being used for syncing topic configurations " + "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " + "If explicitly set to 'required', the IncrementalAlterConfigs API is used without the fallback logic and +" + "if it receives an error from an incompatible broker, the connector will fail." + "If explicitly set to 'never', the AlterConfig is always used." + "This setting will be removed and the behaviour of 'required' will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0"; ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -108,16 +108,16 @@ public void testReplication() throws Exception { Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions( backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1"); - // Failover consumer group to backup cluster. + // Fail-over consumer group to back up cluster. try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { primaryConsumer.assign(backupOffsets.keySet()); backupOffsets.forEach(primaryConsumer::seek); primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); primaryConsumer.commitAsync(); - assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset."); + assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failed over to zero offset."); Review Comment: This means the opposite of what it should. ```suggestion assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer should have failed over to zero offset."); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) Review Comment: Also, doesn't this result in a change in behavior for new topic creation (where we also use this method)? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) Review Comment: Isn't it possible that `useIncrementalAlterConfigs` changes between when we read it here and when we read it in `updateTopicConfigs`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -27,6 +27,13 @@ public interface ConfigPropertyFilter extends Configurable, AutoCloseable { boolean shouldReplicateConfigProperty(String prop); Review Comment: Can we add a note to this method that it does not handle replication of default properties, and that that logic is instead controlled by `shouldReplicateSourceDefault`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -514,6 +541,40 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) { })); } + // visible for testing + @SuppressWarnings("deprecation") + void incrementalAlterConfigs(Map<String, Config> topicConfigs) { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(); + for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) { + Collection<AlterConfigOp> ops = new ArrayList<>(); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); + for (ConfigEntry config : topicConfig.getValue().entries()) { + if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); + } else { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); + } + } + configOps.put(configResource, ops); + } + log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { + if (e != null) { + if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG) + && e instanceof UnsupportedVersionException) { + //Fallback logic + log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e); + useIncrementalAlterConfigs = MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG; + } else if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG) + && e instanceof UnsupportedVersionException) { + context.raiseError(new UnsupportedVersionException("use.incremental.alter.configs was set to \"required\" however the target cluster is not compatible with IncrementalAlterConfigs API")); Review Comment: Two small things: 1. We should log an ERROR level message when this fails (it will be redundant in most cases, but it could come in handy in case the call to `ConnectorContext::raiseError` fails) 2. We should include the original `UnsupportedVersionException` in case there's other useful information in that exception's stack trace and/or message ```suggestion log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); context.raiseError(new ConnectException( "use.incremental.alter.configs was set to \"required\", but the target cluster '" + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e )); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -60,6 +65,11 @@ public boolean shouldReplicateConfigProperty(String prop) { return !excluded(prop); } + @Override + public boolean shouldReplicateSourceDefault(String prop) { + return useDefaultsFrom.equals("source"); Review Comment: We should check against the `config.properties.exclude` pattern as well. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -577,6 +641,9 @@ boolean shouldReplicateTopicConfigurationProperty(String property) { return configPropertyFilter.shouldReplicateConfigProperty(property); } + boolean shouldReplicateSourceDefault(String property) { + return configPropertyFilter.shouldReplicateSourceDefault(property); + } Review Comment: Nit: Add an extra line after method definitions ```suggestion } ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -96,6 +99,7 @@ public class MirrorSourceConnector extends SourceConnector { private Admin sourceAdminClient; private Admin targetAdminClient; private Admin offsetSyncsAdminClient; + private volatile String useIncrementalAlterConfigs; Review Comment: I think it may be a bit simpler to replace this with a boolean value. Most of the time, we're only concerned about which API (incremental or deprecated) that we use. The only time the distinction between requested and required comes into play is if we see an `UnsupportedVersionException` when trying to use the incremental API, and at that point, we can just use `config::useIncrementalAlterConfigs` to check the value and then (if necessary) update the value of the `useIncrementalAlterConfigs` boolean instance field. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -108,16 +108,16 @@ public void testReplication() throws Exception { Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions( backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1"); - // Failover consumer group to backup cluster. + // Fail-over consumer group to back up cluster. Review Comment: Nit: this should actually be "fail over" (two words) to signify the verb, instead of "failover" (one word), which is the noun. For the record, I'd be fine with leaving this as-is, but if we're going to change it, we should change it to the right thing. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -41,12 +41,12 @@ import org.junit.jupiter.api.BeforeEach; /** - * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}. + * Tests MM2 replication and fail over logic for {@link IdentityReplicationPolicy}. Review Comment: This is fine as-is; "failover" is definitely a word. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -151,13 +151,13 @@ public void testReplicationWithEmptyPartition() throws Exception { // produce to all test-topic-empty's partitions, except the last partition produceMessages(primary, topic, NUM_PARTITIONS - 1); - // consume before starting the connectors so we don't need to wait for discovery + // consume before starting the connectors, so we don't need to wait for discovery int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1); try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) { waitForConsumingAllRecords(primaryConsumer, expectedRecords); } - // one way replication from primary to backup + // one way replication from primary to back up Review Comment: This is correct as-is; "backup" is the right word here. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) Review Comment: I think we may want to alter this method to take a boolean `incremental` (rename as you like) parameter and use that instead of checking the value of `useIncrementalAlterConfigs`. That way, when creating new topics, we can hardcode that parameter to `false`, and when syncing topics, we can check the value of `useIncrementalAlterConfigs` once in the beginning, and then use that single check to determine whether we're going to use the incremental or deprecated API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org