Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5308#discussion_r163538231
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
        /**
         * Gets a serialized accumulator map.
         * @return The accumulator map with serialized accumulator values.
    -    * @throws IOException
         */
        @Override
    -   public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
throws IOException {
    +   public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
{
     
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
     
                Map<String, SerializedValue<Object>> result = new 
HashMap<>(accumulatorMap.size());
                for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
    -                   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
    +
    +                   try {
    +                           final SerializedValue<Object> serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
    +                           result.put(entry.getKey(), serializedValue);
    +                   } catch (IOException ioe) {
    +                           LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
    --- End diff --
    
    Why is it acceptable to change the behavior, i.e., to ignore the exception. 
It is not even logged on `ERROR` level.
    Also:
    ```
    LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
    ```


---

Reply via email to