mimaison commented on a change in pull request #11691: URL: https://github.com/apache/kafka/pull/11691#discussion_r791845179
########## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ########## @@ -226,6 +227,11 @@ public Password getPassword(String key) { return keys; } + public boolean hasKeyInOriginals(String configKey) { Review comment: What about `originalsContainsKey()`? ########## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ########## @@ -1674,6 +1674,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString) producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, _deliveryTimeoutMs.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs.toString) + // disable the idempotence since some tests want to test the cases when retries=0, and these tests are not testing producers + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false.toString) Review comment: Let's use `"false"` ########## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ########## @@ -226,6 +227,11 @@ public Password getPassword(String key) { return keys; } + public boolean hasKeyInOriginals(String configKey) { + Objects.requireNonNull(configKey, "config key cannot be null"); Review comment: Do we need this null check? Shouldn't we rely on the behaviour of the underlying Map? ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -143,6 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val adminClients = Buffer[Admin]() producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") + producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") Review comment: Could we instead drop acks=1? I don't think it is important in this test. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -240,6 +242,7 @@ public void testAcksAndIdempotenceForIdempotentProducers() { Properties invalidProps2 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.ACKS_CONFIG, "1"); + // explicitly enable idempotence should still throw exception Review comment: `enable` -> `enabling` ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -478,14 +588,22 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat }; } - @Test - public void testMetadataFetch() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMetadataFetch(boolean isIdempotenceEnabled) throws InterruptedException { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled); + ProducerMetadata metadata = mock(ProducerMetadata.class); // Return empty cluster 4 times and cluster from then on - when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, onePartitionCluster); + if (isIdempotenceEnabled) { Review comment: Should we change the comment above? ########## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ########## @@ -24,14 +24,15 @@ import org.slf4j.LoggerFactory; import org.apache.kafka.common.config.provider.ConfigProvider; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Objects; import java.util.TreeMap; +import java.util.ArrayList; Review comment: Let's keep the current import order ########## File path: core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ########## @@ -352,20 +359,28 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // Verify successful produce/consume/describe on another topic using the same producer, consumer and adminClient val topic2 = "topic2" val tp2 = new TopicPartition(topic2, 0) + setReadAndWriteAcls(tp2) - sendRecords(producer, numRecords, tp2) + // in idempotence producer, we need to create another producer because the previous one is in FATEL_ERROR state (due to authorization error) Review comment: `FATEL_ERROR` -> `FATAL_ERROR` Do we really expect the idempotent producer to not be usable anymore? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java ########## @@ -170,6 +170,7 @@ public void configure(final WorkerConfig config) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0 Review comment: Ah :( I assumed Connect was benefiting from idempotency automatically, it looks like it isn't the case ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -2383,6 +2384,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) + producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") Review comment: Do we need to explicitly set it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org