cadonna commented on a change in pull request #8876:
URL: https://github.com/apache/kafka/pull/8876#discussion_r440801026



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -369,6 +369,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 
         final Consumer<byte[], byte[]> mainConsumer = 
clientSupplier.getConsumer(consumerConfigs);
         changelogReader.setMainConsumer(mainConsumer);
+        changelogReader.setAdminClient(adminClient);

Review comment:
       Q: Why do you not pass the admin client in the constructor? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -273,13 +273,13 @@ public boolean isRunning() {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
 
-    private final Admin adminClient;
     private final ChangelogReader changelogReader;
 
     // package-private for testing
     final ConsumerRebalanceListener rebalanceListener;
     final Consumer<byte[], byte[]> mainConsumer;
     final Consumer<byte[], byte[]> restoreConsumer;
+    final Admin adminClient;

Review comment:
       You mean `private`, right? It is not `private` anymore.
   Instead of making the `adminClient` field package private for testing, I 
would either add a setter `setAdmin()` to `MockClientSupplier` or instantiate 
the admin in a private field of the `MockClientSupplier` and use the existing 
getter to set up the admin, or instantiate the admin in a public field of the 
`MockClientSupplier` and accessing it directly to set it up (similarly to the 
producer and consumer).   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
             return Collections.emptyMap();
 
         try {
-            return restoreConsumer.endOffsets(partitions);
-        } catch (final TimeoutException e) {
+            if (adminClient != null) {

Review comment:
       I also do not understand the distinction. When would we want to call 
`restoreConsumer.endOffsets(partitions)`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
             return Collections.emptyMap();
 
         try {
-            return restoreConsumer.endOffsets(partitions);
-        } catch (final TimeoutException e) {
+            if (adminClient != null) {

Review comment:
       req: Could you add a test (or adapt an existing one) and verify whether 
during `restore()` a call to `adminClient.listOffsets()` with isolation level 
`READ_UNCOMMITTED` is done? 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1634,6 +1634,7 @@ public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t
         final StreamThread thread = createStreamThread("clientId", config, 
false);
         final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.mainConsumer;
         final MockConsumer<byte[], byte[]> mockRestoreConsumer = 
(MockConsumer<byte[], byte[]>) thread.restoreConsumer;
+        final MockAdminClient mockAdminClient = (MockAdminClient) 
thread.adminClient;

Review comment:
       See my comment in `StreamThread`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to