Github user zentol commented on the issue: https://github.com/apache/flink/pull/4647 Tried it out and it works as expected. After looking at the aggregation code in detail I suggest to set an isComplete boolean for each counter instead of setting the value to -1. This makes things more explicit, and preserves the existing behavior of happily aggregating the counters. It also simplifies the addition of bytesInLocal/-Remote. I.e., the aggregation part looks like this: ``` String numBytesInRemoteString = metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE); if (numBytesInRemoteString == null) { this.numBytesInRemoteComplete = false; } else { this.numBytesInRemote += Long.valueOf(numBytesInRemoteString); } ``` and the writing like this: ``` public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { gen.writeObjectFieldStart("metrics"); long numBytesIn = this.numBytesInLocal + this.numBytesInRemote; writeIOMetricWithCompleteness(gen, "read-bytes", numBytesIn, this.numBytesInLocalComplete && this.numBytesInRemoteComplete); writeIOMetricWithCompleteness(gen, "write-bytes", this.numBytesOut, this.numBytesOutComplete); writeIOMetricWithCompleteness(gen, "read-records", this.numRecordsIn, this.numRecordsInComplete); writeIOMetricWithCompleteness(gen, "write-records", this.numRecordsOut, this.numRecordsOutComplete); gen.writeEndObject(); } private void writeIOMetricWithCompleteness(JsonGenerator gen, String fieldName, long fieldValue, boolean isComplete) throws IOException{ gen.writeNumberField(fieldName, fieldValue); gen.writeBooleanField(fieldName + "-complete", isComplete); } ``` What do you think @jameslafa ?
---