C0urante commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r899705167


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1279,6 +1288,455 @@ public void 
testAdminConfigsClientOverridesWithNonePolicy() {
         
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX);
     }
 
+    @Test
+    public void testRegularSourceOffsetsConsumerConfigs() {
+        final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+        
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", "connect-cluster");
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> consumerConfigs = 
Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("read_committed", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9021");
+        workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is allowed to override the isolation level for regular 
(non-exactly-once) source connectors and their tasks
+        assertEquals("read_uncommitted", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+        connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, 
"localhost:489");
+        connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:489", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is allowed to override the isolation level for regular 
(non-exactly-once) source connectors and their tasks
+        assertEquals("read_uncommitted", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+    }
+
+    @Test
+    public void testExactlyOnceSourceOffsetsConsumerConfigs() {
+        final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+        
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", "connect-cluster");
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> consumerConfigs = 
Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("read_committed", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9021");
+        workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override isolation level when exactly-once 
support is enabled
+        assertEquals("read_committed", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+        connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, 
"localhost:489");
+        connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:489", 
consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override isolation level when exactly-once 
support is enabled
+        assertEquals("read_committed", 
consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+    }
+
+    @Test
+    public void testExactlyOnceSourceTaskProducerConfigs() {
+        final Map<String, Object> connectorProducerOverrides = new HashMap<>();
+        
when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connectorProducerOverrides);
+
+        final String groupId = "connect-cluster";
+        final String transactionalId = Worker.taskTransactionalId(groupId, 
TASK_ID.connector(), TASK_ID.task());
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", groupId);
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> producerConfigs = 
Worker.exactlyOnceSourceTaskProducerConfigs(
+                TASK_ID, config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", 
producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+        assertEquals(transactionalId, 
producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+        workerProps.put("producer." + BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9021");
+        workerProps.put("producer." + ENABLE_IDEMPOTENCE_CONFIG, "false");
+        workerProps.put("producer." + TRANSACTIONAL_ID_CONFIG, 
"some-other-transactional-id");
+        config = new DistributedConfig(workerProps);
+        producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(
+                TASK_ID, config, connectorConfig, null, 
allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", 
producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override idempotence or transactional ID for 
exactly-once source tasks
+        assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+        assertEquals(transactionalId, 
producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+        workerProps.remove("producer." + ENABLE_IDEMPOTENCE_CONFIG);
+        workerProps.remove("producer." + TRANSACTIONAL_ID_CONFIG);
+        connectorProducerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, 
"localhost:489");
+        connectorProducerOverrides.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+        connectorProducerOverrides.put(TRANSACTIONAL_ID_CONFIG, 
"yet-another-transactional-id");

Review Comment:
   I don't _think_ that can happen without causing other issues, since the 
user-supplied configs have come from either the JSON REST API or the Java 
properties worker file. The former doesn't deserializes things into a literal 
null and although the latter does, as of 
https://github.com/apache/kafka/pull/11333, we fail validation for connector 
configs in that case.
   
   But, given how little additional effort it is, I don't see a problem with 
adding that check. At the very least, it helps prevent a 500 instead of a 400 
response during config validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to