NicoK closed pull request #6547: [FLINK-10131][network] improve logging around subpartitions URL: https://github.com/apache/flink/pull/6547
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index cc793635047..c6f3e158519 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -80,7 +80,7 @@ public void flush() { @Override public void finish() throws IOException { add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); - LOG.debug("Finished {}.", this); + LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this); } private boolean add(BufferConsumer bufferConsumer, boolean finish) { @@ -132,7 +132,7 @@ public void release() { isReleased = true; } - LOG.debug("Released {}.", this); + LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this); if (view != null) { view.releaseAllResources(); @@ -224,7 +224,8 @@ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener avail "Subpartition %s of is being (or already has been) consumed, " + "but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId()); - LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId()); + LOG.debug("{}: Creating read view for subpartition {} of partition {}.", + parent.getOwningTaskName(), index, parent.getPartitionId()); readView = new PipelinedSubpartitionView(this, availabilityListener); if (!buffers.isEmpty()) { @@ -268,8 +269,8 @@ public String toString() { } return String.format( - "PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]", - numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView); + "PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]", + index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index fbbfa4b45bb..93e5ba15097 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -199,6 +199,10 @@ public JobID getJobId() { return jobId; } + public String getOwningTaskName() { + return owningTaskName; + } + public ResultPartitionID getPartitionId() { return partitionId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 69b461b1a4d..9f696adc362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -143,6 +143,7 @@ public synchronized void finish() throws IOException { if (spillWriter != null) { spillWriter.close(); } + LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this); } @Override @@ -180,6 +181,8 @@ public synchronized void release() throws IOException { isReleased = true; } + LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this); + if (view != null) { view.releaseAllResources(); } @@ -236,8 +239,8 @@ public int releaseMemory() throws IOException { long spilledBytes = spillFinishedBufferConsumers(isFinished); int spilledBuffers = numberOfBuffers - buffers.size(); - LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.", - spilledBytes, spilledBuffers, index, parent.getPartitionId()); + LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.", + parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId()); return spilledBuffers; } @@ -300,9 +303,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { @Override public String toString() { - return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + + return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," + "%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), + index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(), getBuffersInBacklog(), isFinished, readView != null, spillWriter != null); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services