dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1182229885


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -74,6 +83,11 @@ public ErrorInfo(@Nonnull Throwable exception, long 
timestamp) {
                         ? (SerializedThrowable) exception
                         : new SerializedThrowable(exception);
         this.timestamp = timestamp;
+        this.labels = labels;

Review Comment:
   nit
   ```suggestion
           this.labels = Preconditions.checkNotNull(labels);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -107,18 +113,23 @@ public static RootExceptionHistoryEntry 
fromExceptionHistoryEntry(
     public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo 
errorInfo) {
         Preconditions.checkNotNull(errorInfo, "errorInfo");
         return fromGlobalFailure(
-                errorInfo.getException(), errorInfo.getTimestamp(), 
Collections.emptyList());
+                errorInfo.getException(),
+                errorInfo.getTimestamp(),
+                errorInfo.getLabels(),
+                Collections.emptyList());
     }
 
     private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            Map<String, String> labels,

Review Comment:
   the use of nullable seems inconsistent 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1132,6 +1159,16 @@ private void runRequestNextInputSplitTest(
                             expectedRemainingInputSplits
                                     .apply(inputSplitsPerTask)
                                     .toArray(EMPTY_TESTING_INPUT_SPLITS));
+
+            // Make sure FailureEnrichers are triggered for the above failure
+            assertThat(testingEnricher.seenThrowable.stream().map(t -> 
t.getMessage()))

Review Comment:
   Why is the failure enrichment tested in `testRequestNextInputSplit`? We 
should have a dedicated test for new features/regressions.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -66,6 +70,11 @@ public static Throwable handleMissingThrowable(@Nullable 
Throwable throwable) {
     }
 
     public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
+        this(exception, timestamp, null);
+    }
+
+    public ErrorInfo(
+            @Nonnull Throwable exception, long timestamp, @Nullable 
Map<String, String> labels) {

Review Comment:
   The use of nullable seems to be inconsistent in this class. Would it make 
sense to use an empty map instead of nulls?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java:
##########
@@ -87,6 +107,18 @@ protected boolean matchesSafely(
             match = false;
         }
 
+        if (exceptionHistoryEntry.getLabels() == null) {
+            if (expectedLabels != null) {
+                description.appendText(" actualLabels=null");
+                match = false;
+            }
+        } else if (exceptionHistoryEntry.getLabels().equals(expectedLabels)) {
+            description
+                    .appendText(" actualLabels=")
+                    
.appendText(String.valueOf(exceptionHistoryEntry.getLabels()));
+            match = false;

Review Comment:
   I'm not able to wrap my head around this. How come we don't have a match if 
the actual labels are equal to what is expected?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java:
##########
@@ -139,6 +144,11 @@ public JobMasterBuilder 
withFatalErrorHandler(FatalErrorHandler fatalErrorHandle
         return this;
     }
 
+    public JobMasterBuilder withFailureEnriches(Collection<FailureEnricher> 
failureEnrichers) {

Review Comment:
   ```suggestion
       public JobMasterBuilder withFailureEnrichers(Collection<FailureEnricher> 
failureEnrichers) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java:
##########
@@ -122,6 +123,7 @@ private JobMasterService internalCreateJobMasterService(
                         DefaultExecutionDeploymentReconciler::new,
                         BlocklistUtils.loadBlocklistHandlerFactory(
                                 jobMasterConfiguration.getConfiguration()),
+                        Collections.emptySet(),

Review Comment:
   Are we missing something here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to