tillrohrmann commented on a change in pull request #15553:
URL: https://github.com/apache/flink/pull/15553#discussion_r611437990



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -248,10 +248,14 @@ public void close() throws Exception {
     @Override
     public void processResourceRequirements(ResourceRequirements 
resourceRequirements) {
         checkInit();
-        LOG.debug(
-                "Received resource requirements from job {}: {}",
-                resourceRequirements.getJobId(),
-                resourceRequirements.getResourceRequirements());
+        if (resourceRequirements.getResourceRequirements().isEmpty()) {
+            LOG.info("Clearing resource requirements from job {}", 
resourceRequirements.getJobId());

Review comment:
       from -> of?

##########
File path: flink-dist/src/main/resources/flink-conf.yaml
##########
@@ -145,7 +145,7 @@ jobmanager.execution.failover-strategy: region
 
 # The address to which the REST client will connect to
 #
-#rest.address: 0.0.0.0
+rest.address: localhost

Review comment:
       Assuming this is ok, do we have to adjust our documentation to point 
this out now?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -116,10 +117,25 @@ private ExternalResourceUtils() {
         return externalResourceConfigs;
     }
 
+    /**
+     * Instantiate {@link StaticExternalResourceInfoProvider} for all of 
enabled external resources.
+     */
+    public static ExternalResourceInfoProvider 
createStaticExternalResourceInfoProviderFromConfig(
+            Configuration configuration, PluginManager pluginManager) {
+
+        final Map<String, Long> externalResourceAmountMap =
+                getExternalResourceAmountMap(configuration);
+        LOG.info("Enabled external resources: {}", 
externalResourceAmountMap.keySet());

Review comment:
       What about `getExternalResourceConfigurationKeys`? This method also logs 
the same statement. Admittedly this is the statement logged on the `JobManager` 
process and this one here on the `TM`. Maybe we can change what is logged to 
make the distinction a bit clearer.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
##########
@@ -125,7 +127,13 @@ public T next() throws IOException {
                 try {
                     response = sendRequest(buffer.getVersion(), requestOffset);
                 } catch (Exception e) {
-                    LOG.warn("An exception occurs when fetching query 
results", e);
+                    if (ExceptionUtils.findThrowable(
+                                    e, 
UnavailableDispatcherOperationException.class)
+                            .isPresent()) {
+                        LOG.debug("The job execution has not started yet; 
cannot fetch results.");

Review comment:
       I am wondering how we can guard this behaviour against future changes. 
For example, if we change the type of thrown exception at some point. Will we 
notice that we also need to change this line? We don't have to address this 
comment for this PR, though.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java
##########
@@ -204,7 +204,7 @@ public SerializedInputSplit requestNextInputSplit(
 
         final InputSplit nextInputSplit = execution.getNextInputSplit();
 
-        if (log.isDebugEnabled()) {
+        if (nextInputSplit != null) {
             log.debug("Send next input split {}.", nextInputSplit);
         }

Review comment:
       Where is it logged that there are no more input splits?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
##########
@@ -125,7 +127,13 @@ public T next() throws IOException {
                 try {
                     response = sendRequest(buffer.getVersion(), requestOffset);
                 } catch (Exception e) {
-                    LOG.warn("An exception occurs when fetching query 
results", e);
+                    if (ExceptionUtils.findThrowable(
+                                    e, 
UnavailableDispatcherOperationException.class)

Review comment:
       With this change we don't log the exception while the `DispatcherJob` is 
still being created. However, we start logging it once it is created and while 
the `JobManagerRunner` is waiting for the leadership (I guess it will manifest 
as some kind of `TimeoutException`).
   
   I guess this change is still a valid improvement, though.

##########
File path: flink-dist/src/main/resources/flink-conf.yaml
##########
@@ -145,7 +145,7 @@ jobmanager.execution.failover-strategy: region
 
 # The address to which the REST client will connect to
 #
-#rest.address: 0.0.0.0
+rest.address: localhost

Review comment:
       Doesn't this now mean that the user has to change two config options if 
she wants to start the `JobManager` on a machine using the standalone setup?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to