rkhachatryan commented on a change in pull request #16171:
URL: https://github.com/apache/flink/pull/16171#discussion_r654542405



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -142,6 +142,39 @@ public RocksIncrementalSnapshotStrategy(
         this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", 
"");
     }
 
+    public RocksIncrementalSnapshotStrategy(

Review comment:
       Can we use telescopic constructor here?
   Or even better replace the old constructor with the new one, moving uploader 
creation to the outside?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
##########
@@ -210,7 +211,7 @@ public void setupRocksKeyedStateBackend() throws Exception {
         testStreamFactory.setBlockerLatch(blocker);
         testStreamFactory.setWaiterLatch(waiter);
         testStreamFactory.setAfterNumberInvocations(10);
-
+        rocksDBStateUploader = spy(new RocksDBStateUploader(4));

Review comment:
       I think it's better to move `if enableIncrementalCheckpointing` check 
here from `verifyRocksDBStateUploaderClosed` so that the uploader isn't created 
unnecessarily.
   
   We could also use 
`RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()` instead of `4` 
here.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -498,20 +505,36 @@ private RocksDBRestoreOperation 
getRocksDBRestoreOperation(
         RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
         if (enableIncrementalCheckpointing) {
             checkpointSnapshotStrategy =
-                    new RocksIncrementalSnapshotStrategy<>(
-                            db,
-                            rocksDBResourceGuard,
-                            keySerializerProvider.currentSchemaSerializer(),
-                            kvStateInformation,
-                            keyGroupRange,
-                            keyGroupPrefixBytes,
-                            localRecoveryConfig,
-                            cancelStreamRegistry,
-                            instanceBasePath,
-                            backendUID,
-                            materializedSstFiles,
-                            lastCompletedCheckpointId,
-                            numberOfTransferingThreads);
+                    injectRocksDBStateUploader != null
+                            ? new RocksIncrementalSnapshotStrategy<>(

Review comment:
       There are two fields now in builder: `injectRocksDBStateUploader` and 
`numberOfTransferingThreads`. But if the former is set then the latter is 
ignored.
   
   I think the more robust solution would be to  throw an exception in 
`setNumberOfTransferingThreads` if the uploader is not null. Otherwise, we'd 
have to close the old uploader, which is not something I'd expect from a 
builder setter method.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to