rmetzger commented on a change in pull request #14394:
URL: https://github.com/apache/flink/pull/14394#discussion_r544037229



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
##########
@@ -164,6 +169,7 @@ public Void restore() throws Exception {
                                        kvStatesById, restoredMetaInfos.size(),
                                        serializationProxy.getReadVersion(),
                                        
serializationProxy.isUsingKeyGroupCompression());
+                               LOG.info("Finish to restore from state handle: 
{}.", keyedStateHandle);

Review comment:
       ```suggestion
                                LOG.info("Finished restoring from state handle: 
{}.", keyedStateHandle);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
##########
@@ -112,6 +112,7 @@ public HeapKeyedStateBackendBuilder(
                        keyContext);
                try {
                        restoreOperation.restore();
+                       logger.info("Finish to build heap keyed 
state-backend.");

Review comment:
       ```suggestion
                        logger.info("Finished to build heap keyed 
state-backend.");
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -162,11 +162,13 @@ public RocksDBRestoreResult restore()
        private void restoreKeyGroupsInStateHandle()
                throws IOException, StateMigrationException, RocksDBException {
                try {
+                       logger.info("Start to restore from state handle: {}.", 
currentKeyGroupsStateHandle);
                        currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
                        
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
                        currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
                        restoreKVStateMetaData();
                        restoreKVStateData();
+                       logger.info("Finish to restore from state handle: {}.", 
currentKeyGroupsStateHandle);

Review comment:
       ```suggestion
                        logger.info("Finished restoring from state handle: 
{}.", currentKeyGroupsStateHandle);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
##########
@@ -122,6 +126,7 @@ public Void restore() throws Exception {
                                throw 
unexpectedStateHandleException(KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
                        }
 
+                       LOG.info("Start to restore from state handle: {}.", 
keyedStateHandle);

Review comment:
       ```suggestion
                        LOG.info("Starting to restore from state handle: {}.", 
keyedStateHandle);
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -317,21 +314,22 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
                        try {
                                FileUtils.deleteDirectory(instanceBasePath);
                        } catch (Exception ex) {
-                               LOG.warn("Failed to instance base path for 
RocksDB: " + instanceBasePath, ex);
+                               logger.warn("Failed to instance base path for 
RocksDB: " + instanceBasePath, ex);

Review comment:
       ```suggestion
                                logger.warn("Failed to delete base path for 
RocksDB: " + instanceBasePath, ex);
   ```
   
   Not sure, maybe I don't understand what the code is doing here.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -317,21 +314,22 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
                        try {
                                FileUtils.deleteDirectory(instanceBasePath);
                        } catch (Exception ex) {
-                               LOG.warn("Failed to instance base path for 
RocksDB: " + instanceBasePath, ex);
+                               logger.warn("Failed to instance base path for 
RocksDB: " + instanceBasePath, ex);
                        }
                        // Log and rethrow
                        if (e instanceof BackendBuildingException) {
                                throw (BackendBuildingException) e;
                        } else {
                                String errMsg = "Caught unexpected exception.";
-                               LOG.error(errMsg, e);
+                               logger.error(errMsg, e);
                                throw new BackendBuildingException(errMsg, e);
                        }
                }
                InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
                        keyGroupRange,
                        numberOfKeyGroups
                );
+               logger.info("Finish to build RocksDB keyed state-backend at 
{}.", instanceBasePath);

Review comment:
       ```suggestion
                logger.info("Finished building RocksDB keyed state-backend at 
{}.", instanceBasePath);
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
##########
@@ -330,6 +330,7 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
                                                }
                                        } // releases native iterator resources
                                }
+                               logger.info("Finish to restore from state 
handle: {} with rescaling.", rawStateHandle);

Review comment:
       ```suggestion
                                logger.info("Finished restoring from state 
handle: {} with rescaling.", rawStateHandle);
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -162,11 +162,13 @@ public RocksDBRestoreResult restore()
        private void restoreKeyGroupsInStateHandle()
                throws IOException, StateMigrationException, RocksDBException {
                try {
+                       logger.info("Start to restore from state handle: {}.", 
currentKeyGroupsStateHandle);

Review comment:
       ```suggestion
                        logger.info("Starting to restore from state handle: 
{}.", currentKeyGroupsStateHandle);
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
##########
@@ -295,6 +294,7 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
                                throw 
unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class, 
rawStateHandle.getClass());
                        }
 
+                       logger.info("Start to restore from state handle: {} 
with rescaling.", rawStateHandle);

Review comment:
       ```suggestion
                        logger.info("Starting to restore from state handle: {} 
with rescaling.", rawStateHandle);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to