mimaison commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r791845179



##########
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##########
@@ -226,6 +227,11 @@ public Password getPassword(String key) {
         return keys;
     }
 
+    public boolean hasKeyInOriginals(String configKey) {

Review comment:
       What about `originalsContainsKey()`?

##########
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##########
@@ -1674,6 +1674,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString)
       producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
_deliveryTimeoutMs.toString)
       producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
_requestTimeoutMs.toString)
+      // disable the idempotence since some tests want to test the cases when 
retries=0, and these tests are not testing producers
+      producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
false.toString)

Review comment:
       Let's use `"false"`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##########
@@ -226,6 +227,11 @@ public Password getPassword(String key) {
         return keys;
     }
 
+    public boolean hasKeyInOriginals(String configKey) {
+        Objects.requireNonNull(configKey, "config key cannot be null");

Review comment:
       Do we need this null check? Shouldn't we rely on the behaviour of the 
underlying Map?

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -143,6 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val adminClients = Buffer[Admin]()
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")

Review comment:
       Could we instead drop acks=1? I don't think it is important in this test.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -240,6 +242,7 @@ public void testAcksAndIdempotenceForIdempotentProducers() {
         Properties invalidProps2 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "1");
+                // explicitly enable idempotence should still throw exception

Review comment:
       `enable` -> `enabling`

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -478,14 +588,22 @@ Sender newSender(LogContext logContext, KafkaClient 
kafkaClient, ProducerMetadat
         };
     }
 
-    @Test
-    public void testMetadataFetch() throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataFetch(boolean isIdempotenceEnabled) throws 
InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
isIdempotenceEnabled);
+
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
         // Return empty cluster 4 times and cluster from then on
-        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, 
emptyCluster, emptyCluster, onePartitionCluster);
+        if (isIdempotenceEnabled) {

Review comment:
       Should we change the comment above? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##########
@@ -24,14 +24,15 @@
 import org.slf4j.LoggerFactory;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Objects;
 import java.util.TreeMap;
+import java.util.ArrayList;

Review comment:
       Let's keep the current import order

##########
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##########
@@ -352,20 +359,28 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     // Verify successful produce/consume/describe on another topic using the 
same producer, consumer and adminClient
     val topic2 = "topic2"
     val tp2 = new TopicPartition(topic2, 0)
+
     setReadAndWriteAcls(tp2)
-    sendRecords(producer, numRecords, tp2)
+    // in idempotence producer, we need to create another producer because the 
previous one is in FATEL_ERROR state (due to authorization error)

Review comment:
       `FATEL_ERROR` -> `FATAL_ERROR`
   Do we really expect the idempotent producer to not be usable anymore?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
##########
@@ -170,6 +170,7 @@ public void configure(final WorkerConfig config) {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle 
retries in this class
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // 
disable idempotence since retries is force to 0

Review comment:
       Ah :( I assumed Connect was benefiting from idempotency automatically, 
it looks like it isn't the case

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -2383,6 +2384,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def buildTransactionalProducer(): KafkaProducer[Array[Byte], 
Array[Byte]] = {
     producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId)
+    producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true")

Review comment:
       Do we need to explicitly set it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to