C0urante commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r899705167
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1279,6 +1288,455 @@ public void testAdminConfigsClientOverridesWithNonePolicy() { verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX); } + @Test + public void testRegularSourceOffsetsConsumerConfigs() { + final Map<String, Object> connectorConsumerOverrides = new HashMap<>(); + when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides); + + Map<String, String> workerProps = new HashMap<>(this.workerProps); + workerProps.put("exactly.once.source.support", "enabled"); + workerProps.put("bootstrap.servers", "localhost:4761"); + workerProps.put("group.id", "connect-cluster"); + workerProps.put("config.storage.topic", "connect-configs"); + workerProps.put("offset.storage.topic", "connect-offsets"); + workerProps.put("status.storage.topic", "connect-statuses"); + config = new DistributedConfig(workerProps); + + Map<String, Object> consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:4761", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + + workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021"); + workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + config = new DistributedConfig(workerProps); + consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:9021", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + // User is allowed to override the isolation level for regular (non-exactly-once) source connectors and their tasks + assertEquals("read_uncommitted", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + + workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG); + connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489"); + connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + config = new DistributedConfig(workerProps); + consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:489", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + // User is allowed to override the isolation level for regular (non-exactly-once) source connectors and their tasks + assertEquals("read_uncommitted", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + } + + @Test + public void testExactlyOnceSourceOffsetsConsumerConfigs() { + final Map<String, Object> connectorConsumerOverrides = new HashMap<>(); + when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides); + + Map<String, String> workerProps = new HashMap<>(this.workerProps); + workerProps.put("exactly.once.source.support", "enabled"); + workerProps.put("bootstrap.servers", "localhost:4761"); + workerProps.put("group.id", "connect-cluster"); + workerProps.put("config.storage.topic", "connect-configs"); + workerProps.put("offset.storage.topic", "connect-offsets"); + workerProps.put("status.storage.topic", "connect-statuses"); + config = new DistributedConfig(workerProps); + + Map<String, Object> consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:4761", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + + workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021"); + workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + config = new DistributedConfig(workerProps); + consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:9021", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + // User is not allowed to override isolation level when exactly-once support is enabled + assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + + workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG); + connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489"); + connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + config = new DistributedConfig(workerProps); + consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs( + "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:489", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + // User is not allowed to override isolation level when exactly-once support is enabled + assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); + } + + @Test + public void testExactlyOnceSourceTaskProducerConfigs() { + final Map<String, Object> connectorProducerOverrides = new HashMap<>(); + when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connectorProducerOverrides); + + final String groupId = "connect-cluster"; + final String transactionalId = Worker.taskTransactionalId(groupId, TASK_ID.connector(), TASK_ID.task()); + + Map<String, String> workerProps = new HashMap<>(this.workerProps); + workerProps.put("exactly.once.source.support", "enabled"); + workerProps.put("bootstrap.servers", "localhost:4761"); + workerProps.put("group.id", groupId); + workerProps.put("config.storage.topic", "connect-configs"); + workerProps.put("offset.storage.topic", "connect-offsets"); + workerProps.put("status.storage.topic", "connect-statuses"); + config = new DistributedConfig(workerProps); + + Map<String, Object> producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs( + TASK_ID, config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:4761", producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG)); + assertEquals(transactionalId, producerConfigs.get(TRANSACTIONAL_ID_CONFIG)); + + workerProps.put("producer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021"); + workerProps.put("producer." + ENABLE_IDEMPOTENCE_CONFIG, "false"); + workerProps.put("producer." + TRANSACTIONAL_ID_CONFIG, "some-other-transactional-id"); + config = new DistributedConfig(workerProps); + producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs( + TASK_ID, config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID); + assertEquals("localhost:9021", producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG)); + // User is not allowed to override idempotence or transactional ID for exactly-once source tasks + assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG)); + assertEquals(transactionalId, producerConfigs.get(TRANSACTIONAL_ID_CONFIG)); + + workerProps.remove("producer." + ENABLE_IDEMPOTENCE_CONFIG); + workerProps.remove("producer." + TRANSACTIONAL_ID_CONFIG); + connectorProducerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489"); + connectorProducerOverrides.put(ENABLE_IDEMPOTENCE_CONFIG, "false"); + connectorProducerOverrides.put(TRANSACTIONAL_ID_CONFIG, "yet-another-transactional-id"); Review Comment: I don't _think_ that can happen without causing other issues, since the user-supplied configs have come from either the JSON REST API or the Java properties worker file. The former doesn't deserializes things into a literal null and although the latter does, as of https://github.com/apache/kafka/pull/11333, we fail validation for connector configs in that case. But, given how little additional effort it is, I don't see a problem with adding that check. At the very least, it helps prevent a 500 instead of a 400 response during config validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org