This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 4d989f425946aacf7a9ff240c3b4aacc2b31dfac Author: SteNicholas <[email protected]> AuthorDate: Fri May 23 16:25:40 2025 +0800 [CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond, numBytesOutPerSecond metrics for RemoteShuffleServiceFactory ### What changes were proposed in this pull request? Introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for `RemoteShuffleServiceFactory`. Scope | Infix | Metrics | Description | Type -- | -- | -- | -- | -- Task | Shuffle.Remote.[ShuffleId] | numBytesIn | The total number of bytes this shuffle has read. | Counter | Task | Shuffle.Remote.[ShuffleId] | numBytesOut | The total number of bytes this shuffle has written. | Counter | Task | Shuffle.Remote.[ShuffleId] | numRecordsOut | The total number of records this shuffle has written. | Counter | Task | Shuffle.Remote.[ShuffleId] | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter | Task | Shuffle.Remote.[ShuffleId] | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter | Task | Shuffle.Remote.[ShuffleId] | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter | Note: - `numBytesIn` and `numBytesOut` metrics include the total number of bytes for records and events. - `numRecordsOut` metric only includes the total number of records, instead of records and events. ### Why are the changes needed? There is no any metrics related to shuffle read operations and operations writing shuffle data for flink shuffle. It's proposed to introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for `RemoteShuffleServiceFactory`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `RemoteShuffleOutputGateSuiteJ#testSimpleWriteData` - `RemoteShuffleResultPartitionSuiteJ` Closes #3272 from SteNicholas/CELEBORN-2005. Authored-by: SteNicholas <[email protected]> Signed-off-by: mingji <[email protected]> --- .../flink/AbstractRemoteShuffleInputGate.java | 5 +- .../AbstractRemoteShuffleInputGateFactory.java | 7 +- ...bstractRemoteShuffleResultPartitionFactory.java | 20 +++--- .../plugin/flink/RemoteShuffleEnvironment.java | 4 +- .../flink/RemoteShuffleInputGateDelegation.java | 22 ++++-- .../plugin/flink/RemoteShuffleOutputGate.java | 46 +++++++++--- .../RemoteShuffleResultPartitionDelegation.java | 8 +++ .../plugin/flink/metric/ShuffleIOMetricGroup.java | 83 ++++++++++++++++++++++ .../flink/RemoteShuffleOutputGateSuiteJ.java | 51 +++++++++++-- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 52 +++++++++----- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++- .../plugin/flink/RemoteShuffleInputGate.java | 5 +- .../flink/RemoteShuffleInputGateFactory.java | 7 +- .../flink/RemoteShuffleResultPartitionFactory.java | 8 ++- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++- 33 files changed, 386 insertions(+), 112 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java index a7330a2cb..d04497536 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.SupplierWithException; @@ -45,7 +46,7 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate { public AbstractRemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -55,7 +56,7 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate { inputGateDelegation = new RemoteShuffleInputGateDelegation( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java index 9568ac546..0c9e2e8bb 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory { /** Create RemoteShuffleInputGate from {@link InputGateDeploymentDescriptor}. */ public IndexedInputGate create( - String owningTaskName, int gateIndex, InputGateDeploymentDescriptor igdd) { + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) { LOG.info( "Create input gate -- number of buffers per input gate={}, " + "number of concurrent readings={}.", @@ -91,7 +92,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory { createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers); return createInputGate( - owningTaskName, + ownerContext, gateIndex, igdd, bufferPoolFactory, @@ -99,7 +100,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory { } protected abstract IndexedInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java index dfde34787..929939e47 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +89,7 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { } public ResultPartition create( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionDeploymentDescriptor desc, CelebornConf celebornConf) { @@ -100,32 +100,32 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { desc.getNumberOfSubpartitions()); return create( - taskNameWithSubtaskAndId, + ownerContext, partitionIndex, desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), createBufferPoolFactory(), - desc.getShuffleDescriptor(), + (RemoteShuffleDescriptor) desc.getShuffleDescriptor(), celebornConf, desc.getTotalNumberOfPartitions()); } public ResultPartition create( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, int numSubpartitions, int maxParallelism, List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories, - ShuffleDescriptor shuffleDescriptor, + RemoteShuffleDescriptor shuffleDescriptor, CelebornConf celebornConf, int numMappers) { ResultPartition partition = createRemoteShuffleResultPartitionInternal( - taskNameWithSubtaskAndId, + ownerContext, partitionIndex, id, type, @@ -135,13 +135,13 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { celebornConf, numMappers, getBufferCompressor(), - (RemoteShuffleDescriptor) shuffleDescriptor); - LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this); + shuffleDescriptor); + LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this); return partition; } abstract ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java index 8851cc812..dc197ae3a 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java @@ -191,7 +191,7 @@ public class RemoteShuffleEnvironment if (resultPartitionDeploymentDescriptor.getShuffleDescriptor() instanceof RemoteShuffleDescriptor) { return resultPartitionFactory.create( - ownerContext.getOwnerName(), index, resultPartitionDeploymentDescriptor, conf); + ownerContext, index, resultPartitionDeploymentDescriptor, conf); } else { nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId()); nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId()); @@ -246,7 +246,7 @@ public class RemoteShuffleEnvironment ? shuffleEnvironmentWrapper .nettyInputGateFactory() .create(ownerContext, gateIndex, igdd, producerStateProvider, inputChannelMetrics) - : inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, igdd); + : inputGateFactory.create(ownerContext, gateIndex, igdd); } @VisibleForTesting diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index 3a0d9f077..fe871d68c 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.event.AbstractEvent; @@ -43,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.SupplierWithException; @@ -56,6 +58,7 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; +import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup; import org.apache.celeborn.plugin.flink.utils.BufferUtils; public class RemoteShuffleInputGateDelegation { @@ -130,9 +133,11 @@ public class RemoteShuffleInputGateDelegation { private int endSubIndex; private boolean partitionConnectionExceptionEnabled; + private final MetricGroup taskIOMetricGroup; + public RemoteShuffleInputGateDelegation( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -141,7 +146,8 @@ public class RemoteShuffleInputGateDelegation { AvailabilityProvider.AvailabilityHelper availabilityHelper, int startSubIndex, int endSubIndex) { - this.taskName = taskName; + this.taskName = ownerContext.getOwnerName(); + this.taskIOMetricGroup = ownerContext.getParentGroup(); this.gateIndex = gateIndex; this.gateDescriptor = gateDescriptor; this.bufferPoolFactory = bufferPoolFactory; @@ -198,6 +204,8 @@ public class RemoteShuffleInputGateDelegation { RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight(); ShuffleResourceDescriptor shuffleDescriptor = remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor(); + ShuffleIOMetricGroup shuffleIOMetricGroup = + new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleDescriptor.getShuffleId()); LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor); @@ -208,7 +216,7 @@ public class RemoteShuffleInputGateDelegation { startSubIndex, endSubIndex, transferBufferPool, - getDataListener(descriptor.getLeft()), + getDataListener(descriptor.getLeft(), shuffleIOMetricGroup), getFailureListener(remoteDescriptor.getResultPartitionID())); bufferReaders.add(reader); @@ -235,13 +243,14 @@ public class RemoteShuffleInputGateDelegation { .collect(Collectors.toList()); } - private Consumer<ByteBuf> getDataListener(int channelIdx) { + private Consumer<ByteBuf> getDataListener( + int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) { return byteBuf -> { Queue<Buffer> unpackedBuffers = null; try { unpackedBuffers = BufferPacker.unpack(byteBuf); while (!unpackedBuffers.isEmpty()) { - onBuffer(unpackedBuffers.poll(), channelIdx); + onBuffer(unpackedBuffers.poll(), channelIdx, shuffleIOMetricGroup); } } catch (Throwable throwable) { synchronized (lock) { @@ -279,7 +288,7 @@ public class RemoteShuffleInputGateDelegation { }; } - private void onBuffer(Buffer buffer, int channelIdx) { + private void onBuffer(Buffer buffer, int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) { synchronized (lock) { if (closed || cause != null) { buffer.recycleBuffer(); @@ -293,6 +302,7 @@ public class RemoteShuffleInputGateDelegation { checkState(channelInfo.getInputChannelIdx() == channelIdx, "Illegal channel index."); LOG.debug("ReceivedBuffers is adding buffer {} on {}", buffer, channelInfo); receivedBuffers.add(Pair.of(buffer, channelInfo)); + shuffleIOMetricGroup.getNumBytesIn().inc(buffer.getSize()); needRecycle = false; if (wasEmpty) { availabilityHelper.getUnavailableToResetAvailable().complete(null); diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java index dc131f971..fe25492fd 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Optional; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -36,6 +37,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; +import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup; import org.apache.celeborn.plugin.flink.utils.BufferUtils; import org.apache.celeborn.plugin.flink.utils.Utils; @@ -83,6 +85,7 @@ public class RemoteShuffleOutputGate { private boolean isRegisterShuffle = false; private int maxReviveTimes; private boolean hasSentHandshake = false; + protected final ShuffleIOMetricGroup shuffleIOMetricGroup; /** * @param shuffleDesc Describes shuffle meta and shuffle worker address. @@ -95,8 +98,28 @@ public class RemoteShuffleOutputGate { int bufferSize, SupplierWithException<BufferPool, IOException> bufferPoolFactory, CelebornConf celebornConf, - int numMappers) { + int numMappers, + MetricGroup taskIOMetricGroup) { + this( + shuffleDesc, + numSubs, + bufferSize, + bufferPoolFactory, + celebornConf, + numMappers, + taskIOMetricGroup, + null); + } + public RemoteShuffleOutputGate( + RemoteShuffleDescriptor shuffleDesc, + int numSubs, + int bufferSize, + SupplierWithException<BufferPool, IOException> bufferPoolFactory, + CelebornConf celebornConf, + int numMappers, + MetricGroup taskIOMetricGroup, + FlinkShuffleClientImpl flinkShuffleClient) { this.shuffleDesc = shuffleDesc; this.numSubs = numSubs; this.bufferPoolFactory = bufferPoolFactory; @@ -116,8 +139,9 @@ public class RemoteShuffleOutputGate { this.lifecycleManagerPort = shuffleDesc.getShuffleResource().getLifecycleManagerPort(); this.lifecycleManagerTimestamp = shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp(); - this.flinkShuffleClient = getShuffleClient(); + this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() : flinkShuffleClient; this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes(); + this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleId); } /** Initialize transportation gate. */ @@ -210,14 +234,16 @@ public class RemoteShuffleOutputGate { /** Writes a piece of data to a subpartition. */ public void write(ByteBuf byteBuf, BufferHeader bufferHeader) { try { - flinkShuffleClient.pushDataToLocation( - shuffleId, - mapId, - attemptId, - bufferHeader.getSubPartitionId(), - io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()), - partitionLocation, - () -> byteBuf.release()); + int bytesWritten = + flinkShuffleClient.pushDataToLocation( + shuffleId, + mapId, + attemptId, + bufferHeader.getSubPartitionId(), + io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()), + partitionLocation, + byteBuf::release); + shuffleIOMetricGroup.getNumBytesOut().inc(bytesWritten); } catch (IOException e) { Utils.rethrowAsRuntimeException(e); } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java index 1bd4285ee..d2e255968 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java @@ -108,6 +108,7 @@ public class RemoteShuffleResultPartitionDelegation { DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() : getUnicastDataBuffer(); if (dataBuffer.append(record, targetSubpartition, dataType)) { + incNumRecordsOut(dataType); return; } @@ -117,6 +118,7 @@ public class RemoteShuffleResultPartitionDelegation { dataBuffer.finish(); dataBuffer.release(); writeLargeRecord(record, targetSubpartition, dataType, isBroadcast); + incNumRecordsOut(dataType); return; } flushDataBuffer(dataBuffer, isBroadcast); @@ -127,6 +129,12 @@ public class RemoteShuffleResultPartitionDelegation { emit(record, targetSubpartition, dataType, isBroadcast); } + private void incNumRecordsOut(Buffer.DataType dataType) { + if (dataType.isBuffer()) { + outputGate.shuffleIOMetricGroup.getNumRecordsOut().inc(); + } + } + @VisibleForTesting public DataBuffer getUnicastDataBuffer() throws IOException { flushBroadcastDataBuffer(); diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java new file mode 100644 index 000000000..16a9f4cf5 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.metric; + +import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; + +/** + * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is + * forwarded to the parent shuffle metric group. + */ +public class ShuffleIOMetricGroup extends ProxyMetricGroup<MetricGroup> { + + private final Counter numBytesIn; + private final Counter numBytesOut; + private final Counter numRecordsOut; + + private final Meter numBytesInRate; + private final Meter numBytesOutRate; + private final Meter numRecordsOutRate; + + public ShuffleIOMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId) { + super(createShuffleIOOwnerMetricGroup(taskIOMetricGroup).addGroup(shuffleId)); + this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new SimpleCounter()); + this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new SimpleCounter()); + this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SimpleCounter()); + this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn)); + this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut)); + this.numRecordsOutRate = + meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut)); + } + + // ============================================================================================ + // Getters + // ============================================================================================ + + public Counter getNumBytesIn() { + return numBytesIn; + } + + public Counter getNumBytesOut() { + return numBytesOut; + } + + public Counter getNumRecordsOut() { + return numRecordsOut; + } + + public Meter getNumBytesInRate() { + return numBytesInRate; + } + + public Meter getNumBytesOutRate() { + return numBytesOutRate; + } + + public Meter getNumRecordsOutRate() { + return numRecordsOutRate; + } +} diff --git a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java index 33c31b2e4..cc3cf9761 100644 --- a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java +++ b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java @@ -22,39 +22,52 @@ import static org.mockito.Mockito.*; import java.io.IOException; import java.util.Optional; +import java.util.Random; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; public class RemoteShuffleOutputGateSuiteJ { - private final RemoteShuffleOutputGate remoteShuffleOutputGate = - mock(RemoteShuffleOutputGate.class); - private final FlinkShuffleClientImpl shuffleClient = mock(FlinkShuffleClientImpl.class); private static final int BUFFER_SIZE = 20; private BufferPool bufferPool; + private RemoteShuffleOutputGate remoteShuffleOutputGate; @Before public void setup() throws IOException { - remoteShuffleOutputGate.flinkShuffleClient = shuffleClient; + remoteShuffleOutputGate = + new RemoteShuffleOutputGate( + shuffleDescriptor(), + 2, + BUFFER_SIZE, + () -> bufferPool, + new CelebornConf(), + 10, + new UnregisteredMetricsGroup(), + mock(FlinkShuffleClientImpl.class)); NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, BUFFER_SIZE); bufferPool = networkBufferPool.createBufferPool(10, 10); } @Test - public void TestSimpleWriteData() throws IOException, InterruptedException { + public void testSimpleWriteData() throws IOException, InterruptedException { PartitionLocation partitionLocation = new PartitionLocation( 1, 0, "localhost", 123, 245, 789, 238, PartitionLocation.Mode.PRIMARY); - when(shuffleClient.registerMapPartitionTask(anyInt(), anyInt(), anyInt(), anyInt(), anyInt())) + when(remoteShuffleOutputGate.flinkShuffleClient.registerMapPartitionTask( + anyInt(), anyInt(), anyInt(), anyInt(), anyInt())) .thenAnswer(t -> partitionLocation); when(remoteShuffleOutputGate.flinkShuffleClient.pushDataHandShake( anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any())) @@ -67,7 +80,19 @@ public class RemoteShuffleOutputGateSuiteJ { .thenAnswer(t -> Optional.empty()); remoteShuffleOutputGate.regionStart(false); - remoteShuffleOutputGate.write(bufferPool.requestBuffer(), 0); + Buffer buffer = bufferPool.requestBuffer(); + buffer.asByteBuf().writeByte(10); + remoteShuffleOutputGate.write(buffer, 0); + + when(remoteShuffleOutputGate.flinkShuffleClient.pushDataToLocation( + anyInt(), anyInt(), anyInt(), anyInt(), any(), any(), any())) + .thenReturn(buffer.getSize()); + remoteShuffleOutputGate.write(buffer, 1); + Assert.assertEquals( + buffer.getSize(), remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOut().getCount()); + Assert.assertEquals( + buffer.getSize(), + remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOutRate().getCount()); doNothing() .when(remoteShuffleOutputGate.flinkShuffleClient) @@ -97,4 +122,16 @@ public class RemoteShuffleOutputGateSuiteJ { Assert.assertEquals(0, byteBuf.refCnt()); Assert.assertEquals(0, celebornByteBuf.refCnt()); } + + private RemoteShuffleDescriptor shuffleDescriptor() { + byte[] bytes = new byte[16]; + new Random().nextBytes(bytes); + return new RemoteShuffleDescriptor( + new JobID(bytes).toString(), + new JobID(bytes), + new JobID(bytes).toString(), + new ResultPartitionID(), + new RemoteShuffleResource( + "1", 1, System.currentTimeMillis(), new ShuffleResourceDescriptor(1, 1, 1, 0))); + } } diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 348fb3556..32b22fa1e 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index 54737f03a..faf733bef 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -38,7 +39,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -46,8 +47,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 547d3fa08..363366b46 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 14c2a7b59..22c4764f2 100644 --- a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,6 +44,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -68,9 +69,9 @@ import org.apache.celeborn.plugin.flink.utils.BufferUtils; public class RemoteShuffleResultPartitionSuiteJ { private final int networkBufferSize = 32 * 1024; - private BufferCompressor bufferCompressor = new BufferCompressor(networkBufferSize, "lz4"); - private RemoteShuffleOutputGate remoteShuffleOutputGate = mock(RemoteShuffleOutputGate.class); - private final String compressCodec = "LZ4"; + private final BufferCompressor bufferCompressor = new BufferCompressor(networkBufferSize, "lz4"); + private final RemoteShuffleOutputGate remoteShuffleOutputGate = + mock(RemoteShuffleOutputGate.class); private final CelebornConf conf = new CelebornConf(); BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, "LZ4"); @@ -80,7 +81,7 @@ public class RemoteShuffleResultPartitionSuiteJ { private NetworkBufferPool globalBufferPool; - private BufferPool dataBufferPool; + private BufferPool sortBufferPool; private BufferPool nettyBufferPool; @@ -99,8 +100,8 @@ public class RemoteShuffleResultPartitionSuiteJ { outputGate.release(); } - if (dataBufferPool != null) { - dataBufferPool.lazyDestroy(); + if (sortBufferPool != null) { + sortBufferPool.lazyDestroy(); } if (nettyBufferPool != null) { nettyBufferPool.lazyDestroy(); @@ -173,7 +174,9 @@ public class RemoteShuffleResultPartitionSuiteJ { random.nextBytes(dataWritten); ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); - assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -206,7 +209,9 @@ public class RemoteShuffleResultPartitionSuiteJ { random.nextBytes(dataWritten); ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); - assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -245,20 +250,26 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); - assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); - assertEquals(2, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); - assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); - assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); - assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); partitionWriter.finish(); partitionWriter.close(); @@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -357,14 +370,14 @@ public class RemoteShuffleResultPartitionSuiteJ { private void initResultPartitionWriter( int numSubpartitions, - int dataBufferPoolSize, + int sortBufferPoolSize, int nettyBufferPoolSize, boolean compressionEnabled, CelebornConf conf, int numMappers) throws Exception { - dataBufferPool = globalBufferPool.createBufferPool(dataBufferPoolSize, dataBufferPoolSize); + sortBufferPool = globalBufferPool.createBufferPool(sortBufferPoolSize, sortBufferPoolSize); nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize, nettyBufferPoolSize); outputGate = @@ -384,7 +397,7 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, new ResultPartitionManager(), bufferCompressor, - () -> dataBufferPool, + () -> sortBufferPool, outputGate); } else { partitionWriter = @@ -398,7 +411,7 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, new ResultPartitionManager(), null, - () -> dataBufferPool, + () -> sortBufferPool, outputGate); } } @@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false; @@ -551,7 +565,7 @@ public class RemoteShuffleResultPartitionSuiteJ { } } - private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception { + private RemoteShuffleDescriptor getShuffleDescriptor() { Random random = new Random(); byte[] bytes = new byte[16]; random.nextBytes(bytes); diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index a8fdf4774..56169b6d6 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index ef8ea9655..f4102ec01 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat // For testing. @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 547d3fa08..363366b46 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index c02e1ad8b..22c4764f2 100644 --- a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,6 +44,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 90041fa0e..0f93e6975 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index ef8ea9655..f4102ec01 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat // For testing. @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 547d3fa08..363366b46 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index c02e1ad8b..22c4764f2 100644 --- a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,6 +44,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index dfe2eeca8..6d513265e 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index ef8ea9655..f4102ec01 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat // For testing. @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 547d3fa08..363366b46 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index fece38532..3cc25bcbc 100644 --- a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,6 +44,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -441,7 +454,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index dfe2eeca8..6d513265e 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index 6f1fd803e..bb4abbc70 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat // For testing. @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, CompressionCodec.valueOf(compressionCodec)); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index a9742cc86..69b9a71bc 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } @Override diff --git a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 99f502d87..0ceabcca8 100644 --- a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 85443db36..7434b8cff 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.util.CloseableIterator; @@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { public RemoteShuffleInputGate( CelebornConf celebornConf, - String taskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { int numConcurrentReading) { super( celebornConf, - taskName, + ownerContext, gateIndex, gateDescriptor, bufferPoolFactory, diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index 6f1fd803e..bb4abbc70 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat // For testing. @Override protected RemoteShuffleInputGate createInputGate( - String owningTaskName, + ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, CompressionCodec.valueOf(compressionCodec)); return new RemoteShuffleInputGate( - this.celebornConf, - owningTaskName, + celebornConf, + ownerContext, gateIndex, igdd, bufferPoolFactory, diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index a9742cc86..69b9a71bc 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - String taskNameWithSubtaskAndId, + ShuffleIOOwnerContext ownerContext, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory BufferCompressor bufferCompressor, RemoteShuffleDescriptor rsd) { return new RemoteShuffleResultPartition( - taskNameWithSubtaskAndId, + ownerContext.getOwnerName(), partitionIndex, id, type, @@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory networkBufferSize, bufferPoolFactories.get(1), celebornConf, - numMappers)); + numMappers, + ownerContext.getParentGroup())); } @Override diff --git a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 99f502d87..0ceabcca8 100644 --- a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.emitRecord(recordWritten, 0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ { ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten); partitionWriter.broadcastRecord(recordWritten); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(1, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); partitionWriter.close(); @@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(2, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize)); assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(3, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flush(0); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ { partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2); partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3); assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(5, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.flushAll(); assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers()); @@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ { record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, numBytesWritten); } } + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount()); + assertEquals(numRecords, outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount()); partitionWriter.finish(); assertTrue(outputGate.isFinished()); @@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferSize, bufferPoolFactory, celebornConf, - numMappers); + numMappers, + new UnregisteredMetricsGroup()); isSetup = false; isFinished = false; isClosed = false;
