vvcephei commented on a change in pull request #8738:
URL: https://github.com/apache/kafka/pull/8738#discussion_r431966350



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -40,9 +41,14 @@
 import org.slf4j.LoggerFactory;
 
 public class ClientUtils {
-
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientUtils.class);
 
+    static final class InternalAdminClientConfig extends AdminClientConfig {

Review comment:
       How about QuietAdminClientConfig, like QuietStreamsConfig?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -90,24 +96,21 @@ public static String getTaskProducerClientId(final String 
threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> 
fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
-                                                                               
            final Admin adminClient) {
-        return fetchEndOffsets(partitions, adminClient, null);
+    public static int getAdminDefaultApiTimeoutMs(final StreamsConfig 
streamsConfig) {
+        final InternalAdminClientConfig dummyAdmin = new 
InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+        return 
dummyAdmin.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
     }
 
     public static Map<TopicPartition, ListOffsetsResultInfo> 
fetchEndOffsets(final Collection<TopicPartition> partitions,
                                                                              
final Admin adminClient,
-                                                                             
final Duration timeout) {
+                                                                             
final long timeoutMs) {

Review comment:
       I think it's because we're now also calling it right after calling 
`getAdminDefaultApiTimeoutMs`, so it seems a bummer to create a Duration from 
millis and then immediately convert it back to millis. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -89,7 +89,7 @@ public void shouldProperlyConfigureTheAssignor() throws 
NoSuchFieldException, Il
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
"480000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, 
configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
90_000)

Review comment:
       Just curious, why the change from 9 to 90,000?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to