This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 469a0c76 [ISSUE-390] Print more infos after read finished (#395)
469a0c76 is described below
commit 469a0c76264658ad8a9459a7b4785f343cbb3357
Author: xianjingfeng <[email protected]>
AuthorDate: Sat Dec 10 15:49:44 2022 +0800
[ISSUE-390] Print more infos after read finished (#395)
### What changes were proposed in this pull request?
1.Print how much data the client read from each server.
2.Print how much data skipped.
### Why are the changes needed?
Currently, we do not know how much data the client read from each server
and how much data skipped #390
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run Uts and check logs
---
.../uniffle/client/impl/ShuffleReadClientImpl.java | 4 +-
.../test/ShuffleServerFaultToleranceTest.java | 45 +++++++--
.../test/ShuffleServerWithMemLocalHdfsTest.java | 62 ++++++++++---
.../uniffle/test/ShuffleServerWithMemoryTest.java | 7 +-
.../storage/factory/ShuffleHandlerFactory.java | 2 +-
.../storage/handler/ClientReadHandlerMetric.java | 101 +++++++++++++++++++++
.../storage/handler/api/ClientReadHandler.java | 2 +-
.../handler/impl/AbstractClientReadHandler.java | 36 +++++++-
.../handler/impl/ComposedClientReadHandler.java | 96 ++++++++++----------
.../handler/impl/HdfsClientReadHandler.java | 22 -----
.../impl/MultiReplicaClientReadHandler.java | 18 +---
11 files changed, 285 insertions(+), 110 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index a6de15b9..1cc80a41 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -199,6 +199,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
//so exception should not be thrown here if blocks have multiple
replicas
if (shuffleServerInfoList.size() > 1) {
LOG.warn(errMsg);
+ clientReadHandler.updateConsumedBlockInfo(bs, true);
continue;
} else {
throw new RssException(errMsg);
@@ -209,9 +210,10 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
processedBlockIds.addLong(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
// only update the statistics of necessary blocks
- clientReadHandler.updateConsumedBlockInfo(bs);
+ clientReadHandler.updateConsumedBlockInfo(bs, false);
break;
}
+ clientReadHandler.updateConsumedBlockInfo(bs, true);
// mark block as processed
processedBlockIds.addLong(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 636ba65f..82f44089 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -37,6 +37,7 @@ import
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
@@ -49,11 +50,15 @@ import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
-import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
+import org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase {
@@ -115,7 +120,8 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
CreateShuffleReadHandlerRequest request =
mockCreateShuffleReadHandlerRequest(
testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.MEMORY_LOCALFILE);
- ClientReadHandler clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ AbstractClientReadHandler clientReadHandler =
+ (AbstractClientReadHandler)
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
blocks.forEach((block) -> {
@@ -123,7 +129,15 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
});
ShuffleDataResult sdr = clientReadHandler.readShuffleData();
TestUtils.validateResult(expectedData, sdr);
-
+ for (BufferSegment bs : sdr.getBufferSegments()) {
+ clientReadHandler.updateConsumedBlockInfo(bs, false);
+ }
+ ClientReadHandlerMetric exceptMetric = mock(ClientReadHandlerMetric.class);
+ when(exceptMetric.getReadBlockNum()).thenReturn(3L);
+ when(exceptMetric.getReadLength()).thenReturn(75L);
+ when(exceptMetric.getReadUncompressLength()).thenReturn(75L);
+ ClientReadHandlerMetric readHandlerMetric =
clientReadHandler.getReadHandlerMetric();
+ assertTrue(readHandlerMetric.equals(exceptMetric));
// send data to shuffle server, and wait until flush to localfile
List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
shuffleId, partitionId, 0, 3, 25,
@@ -136,13 +150,22 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
waitFlush(testAppId, shuffleId);
request = mockCreateShuffleReadHandlerRequest(
testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.LOCALFILE);
- clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ clientReadHandler = (AbstractClientReadHandler)
+ ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
sdr = clientReadHandler.readShuffleData();
blocks2.forEach((block) -> {
expectedData.put(block.getBlockId(), block.getData());
});
TestUtils.validateResult(expectedData, sdr);
-
+ for (BufferSegment bs : sdr.getBufferSegments()) {
+ clientReadHandler.updateConsumedBlockInfo(bs, false);
+ }
+ readHandlerMetric = clientReadHandler.getReadHandlerMetric();
+ exceptMetric = mock(ClientReadHandlerMetric.class);
+ when(exceptMetric.getReadBlockNum()).thenReturn(6L);
+ when(exceptMetric.getReadLength()).thenReturn(150L);
+ when(exceptMetric.getReadUncompressLength()).thenReturn(150L);
+ assertTrue(readHandlerMetric.equals(exceptMetric));
// send data to shuffle server, and wait until flush to hdfs
List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
shuffleId, partitionId, 0, 3, 150,
@@ -157,9 +180,19 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
waitFlush(testAppId, shuffleId);
request = mockCreateShuffleReadHandlerRequest(
testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.HDFS);
- clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ clientReadHandler = (AbstractClientReadHandler)
+ ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
sdr = clientReadHandler.readShuffleData();
TestUtils.validateResult(expectedData, sdr);
+ for (BufferSegment bs : sdr.getBufferSegments()) {
+ clientReadHandler.updateConsumedBlockInfo(bs, false);
+ }
+ readHandlerMetric = clientReadHandler.getReadHandlerMetric();
+ exceptMetric = mock(ClientReadHandlerMetric.class);
+ when(exceptMetric.getReadBlockNum()).thenReturn(3L);
+ when(exceptMetric.getReadLength()).thenReturn(450L);
+ when(exceptMetric.getReadUncompressLength()).thenReturn(450L);
+ assertTrue(readHandlerMetric.equals(exceptMetric));
}
private CreateShuffleReadHandlerRequest mockCreateShuffleReadHandlerRequest(
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index de74c1b5..e3e45aa6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -88,9 +89,18 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
shuffleServerClient.close();
}
+ @Test
+ public void memoryLocalFileHDFSReadWithFilterAndSkipTest() throws Exception {
+ runTest(true);
+ }
+
@Test
public void memoryLocalFileHDFSReadWithFilterTest() throws Exception {
- String testAppId = "memoryLocalFileHDFSReadWithFilterTest";
+ runTest(false);
+ }
+
+ private void runTest(boolean checkSkippedMetrics) throws Exception {
+ String testAppId = "memoryLocalFileHDFSReadWithFilterTest_" + "ship_" +
checkSkippedMetrics;
int shuffleId = 0;
int partitionId = 0;
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId,
0,
@@ -126,7 +136,9 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
handlers[0] = memoryClientReadHandler;
handlers[1] = localFileClientReadHandler;
handlers[2] = hdfsClientReadHandler;
- ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
+ ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST,
SHUFFLE_SERVER_PORT);
+ ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(
+ ssi, handlers);
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
@@ -137,7 +149,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
processBlockIds.addLong(blocks.get(0).getBlockId());
processBlockIds.addLong(blocks.get(1).getBlockId());
processBlockIds.addLong(blocks.get(2).getBlockId());
- sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs));
+ sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
// send data to shuffle server, and wait until flush to LocalFile
List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
@@ -148,7 +160,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(shuffleId, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(
- testAppId, 3, 1000, shuffleToBlocks);
+ testAppId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
waitFlush(testAppId, shuffleId);
@@ -161,7 +173,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
validateResult(expectedData, sdr);
processBlockIds.addLong(blocks2.get(0).getBlockId());
processBlockIds.addLong(blocks2.get(1).getBlockId());
- sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs));
+ sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
// read the 3-th segment from localFile
sdr = composedClientReadHandler.readShuffleData();
@@ -169,7 +181,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
expectedData.put(blocks2.get(2).getBlockId(), blocks2.get(2).getData());
validateResult(expectedData, sdr);
processBlockIds.addLong(blocks2.get(2).getBlockId());
- sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs));
+ sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
// send data to shuffle server, and wait until flush to HDFS
List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
@@ -180,7 +192,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(shuffleId, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(
- testAppId, 3, 1000, shuffleToBlocks);
+ testAppId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
waitFlush(testAppId, shuffleId);
@@ -192,18 +204,40 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
validateResult(expectedData, sdr);
processBlockIds.addLong(blocks3.get(0).getBlockId());
processBlockIds.addLong(blocks3.get(1).getBlockId());
- sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs));
+ sdr.getBufferSegments().forEach(bs ->
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
// all segments are processed
sdr = composedClientReadHandler.readShuffleData();
assertNull(sdr);
- assert (composedClientReadHandler.getReadBlokNumInfo()
- .contains("Client read 8 blocks [ hot:3 warm:3 cold:2 frozen:0 ]"));
- assert (composedClientReadHandler.getReadLengthInfo()
- .contains("Client read 625 bytes [ hot:75 warm:150 cold:400 frozen:0
]"));
- assert (composedClientReadHandler.getReadUncompressLengthInfo()
- .contains("Client read 625 uncompressed bytes [ hot:75 warm:150
cold:400 frozen:0 ]"));
+ if (checkSkippedMetrics) {
+ String readBlokNumInfo = composedClientReadHandler.getReadBlokNumInfo();
+ assert (readBlokNumInfo.contains("Client read 0 blocks from [" + ssi +
"]")
+ && readBlokNumInfo.contains("Skipped[ hot:3 warm:3 cold:2 frozen:0
]")
+ && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0
]"));
+ String readLengthInfo = composedClientReadHandler.getReadLengthInfo();
+ assert (readLengthInfo.contains("Client read 0 bytes from [" + ssi + "]")
+ && readLengthInfo.contains("Skipped[ hot:75 warm:150 cold:400
frozen:0 ]")
+ && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0
]"));
+ String readUncompressLengthInfo =
composedClientReadHandler.getReadUncompressLengthInfo();
+ assert (readUncompressLengthInfo.contains("Client read 0 uncompressed
bytes from [" + ssi + "]")
+ && readUncompressLengthInfo.contains("Skipped[ hot:75 warm:150
cold:400 frozen:0 ]")
+ && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0
]"));
+ } else {
+ String readBlokNumInfo = composedClientReadHandler.getReadBlokNumInfo();
+ assert (readBlokNumInfo.contains("Client read 8 blocks from [" + ssi +
"]")
+ && readBlokNumInfo.contains("Consumed[ hot:3 warm:3 cold:2 frozen:0
]")
+ && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0
]"));
+ String readLengthInfo = composedClientReadHandler.getReadLengthInfo();
+ assert (readLengthInfo.contains("Client read 625 bytes from [" + ssi +
"]")
+ && readLengthInfo.contains("Consumed[ hot:75 warm:150 cold:400
frozen:0 ]")
+ && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0
]"));
+ String readUncompressLengthInfo =
composedClientReadHandler.getReadUncompressLengthInfo();
+ assert (readUncompressLengthInfo.contains("Client read 625 uncompressed
bytes from [" + ssi + "]")
+ && readUncompressLengthInfo.contains("Consumed[ hot:75 warm:150
cold:400 frozen:0 ]")
+ && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0
]"));
+ }
+
}
protected void waitFlush(String appId, int shuffleId) throws
InterruptedException {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 03ed356a..accc280f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -145,7 +146,8 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
ClientReadHandler[] handlers = new ClientReadHandler[2];
handlers[0] = memoryClientReadHandler;
handlers[1] = localFileQuorumClientReadHandler;
- ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
+ ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(
+ new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT), handlers);
// read from memory with ComposedClientReadHandler
sdr = composedClientReadHandler.readShuffleData();
expectedData.clear();
@@ -243,7 +245,8 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
ClientReadHandler[] handlers = new ClientReadHandler[2];
handlers[0] = memoryClientReadHandler;
handlers[1] = localFileClientReadHandler;
- ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
+ ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(
+ new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT), handlers);
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index f3af0d46..e8fc4046 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -110,7 +110,7 @@ public class ShuffleHandlerFactory {
throw new RssException("This should not happen due to the unknown
storage type: " + storageType);
}
- return new ComposedClientReadHandler(handlers);
+ return new ComposedClientReadHandler(serverInfo, handlers);
}
private ClientReadHandler
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request,
ShuffleServerInfo ssi) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
new file mode 100644
index 00000000..3032ecfc
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler;
+
+import java.util.Objects;
+
+public class ClientReadHandlerMetric {
+ private long readBlockNum = 0L;
+ private long readLength = 0L;
+ private long readUncompressLength = 0L;
+
+ private long skippedReadBlockNum = 0L;
+ private long skippedReadLength = 0L;
+ private long skippedReadUncompressLength = 0L;
+
+ public long getReadBlockNum() {
+ return readBlockNum;
+ }
+
+ public void incReadBlockNum() {
+ this.readBlockNum++;
+ }
+
+ public long getReadLength() {
+ return readLength;
+ }
+
+ public void incReadLength(long readLength) {
+ this.readLength += readLength;
+ }
+
+ public long getReadUncompressLength() {
+ return readUncompressLength;
+ }
+
+ public void incReadUncompressLength(long readUncompressLength) {
+ this.readUncompressLength += readUncompressLength;
+ }
+
+ public long getSkippedReadBlockNum() {
+ return skippedReadBlockNum;
+ }
+
+ public void incSkippedReadBlockNum() {
+ this.skippedReadBlockNum++;
+ }
+
+ public long getSkippedReadLength() {
+ return skippedReadLength;
+ }
+
+ public void incSkippedReadLength(long skippedReadLength) {
+ this.skippedReadLength += skippedReadLength;
+ }
+
+ public long getSkippedReadUncompressLength() {
+ return skippedReadUncompressLength;
+ }
+
+ public void incSkippedReadUncompressLength(long skippedReadUncompressLength)
{
+ this.skippedReadUncompressLength += skippedReadUncompressLength;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientReadHandlerMetric that = (ClientReadHandlerMetric) o;
+ return readBlockNum == that.getReadBlockNum()
+ && readLength == that.getReadLength()
+ && readUncompressLength == that.getReadUncompressLength()
+ && skippedReadBlockNum == that.getSkippedReadBlockNum()
+ && skippedReadLength == that.getSkippedReadLength()
+ && skippedReadUncompressLength ==
that.getSkippedReadUncompressLength();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(readBlockNum, readLength, readUncompressLength,
+ skippedReadBlockNum, skippedReadLength, skippedReadUncompressLength);
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
index ba018d3a..e206a29e 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
@@ -30,7 +30,7 @@ public interface ClientReadHandler {
// but does not know the actually consumed blocks,
// so the consumer should let the handler update statistics.
// Each type of handler can design their rules.
- void updateConsumedBlockInfo(BufferSegment bs);
+ void updateConsumedBlockInfo(BufferSegment bs, boolean isSkippedMetrics);
// Display the statistics of consumed blocks
void logConsumedBlockInfo();
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
index b997b0ac..efddf337 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
@@ -17,16 +17,21 @@
package org.apache.uniffle.storage.handler.impl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
public abstract class AbstractClientReadHandler implements ClientReadHandler {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractClientReadHandler.class);
protected String appId;
protected int shuffleId;
protected int partitionId;
protected int readBufferSize;
+ protected ClientReadHandlerMetric readHandlerMetric = new
ClientReadHandlerMetric();
@Override
public ShuffleDataResult readShuffleData() {
@@ -38,10 +43,37 @@ public abstract class AbstractClientReadHandler implements
ClientReadHandler {
}
@Override
- public void updateConsumedBlockInfo(BufferSegment bs) {
+ public void updateConsumedBlockInfo(BufferSegment bs, boolean
isSkippedMetrics) {
+ if (bs == null) {
+ return;
+ }
+ updateBlockMetric(readHandlerMetric, bs, isSkippedMetrics);
}
@Override
public void logConsumedBlockInfo() {
+ LOG.info("Client read [" + readHandlerMetric.getReadBlockNum() + " blocks,"
+ + " bytes:" + readHandlerMetric.getReadLength() + " uncompressed
bytes:"
+ + readHandlerMetric.getReadUncompressLength()
+ + "], skipped[" + readHandlerMetric.getSkippedReadBlockNum() + "
blocks,"
+ + " bytes:" + readHandlerMetric.getSkippedReadLength() + "
uncompressed bytes:"
+ + readHandlerMetric.getSkippedReadUncompressLength() + "]");
+ }
+
+ protected void updateBlockMetric(ClientReadHandlerMetric metric,
BufferSegment bs, boolean isSkippedMetrics) {
+ if (isSkippedMetrics) {
+ metric.incSkippedReadBlockNum();
+ metric.incSkippedReadLength(bs.getLength());
+ metric.incSkippedReadUncompressLength(bs.getUncompressLength());
+ } else {
+ metric.incReadBlockNum();
+ metric.incReadLength(bs.getLength());
+ metric.incReadUncompressLength(bs.getUncompressLength());
+ }
}
+
+ public ClientReadHandlerMetric getReadHandlerMetric() {
+ return readHandlerMetric;
+ }
+
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index da4e2b2f..ea3d0be3 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
/**
@@ -34,10 +36,11 @@ import
org.apache.uniffle.storage.handler.api.ClientReadHandler;
* The storage types reading order is as follows: HOT -> WARM -> COLD -> FROZEN
* @see <a
href="https://github.com/apache/incubator-uniffle/pull/276">PR-276</a>
*/
-public class ComposedClientReadHandler implements ClientReadHandler {
+public class ComposedClientReadHandler extends AbstractClientReadHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ComposedClientReadHandler.class);
+ private final ShuffleServerInfo serverInfo;
private ClientReadHandler hotDataReadHandler;
private ClientReadHandler warmDataReadHandler;
private ClientReadHandler coldDataReadHandler;
@@ -49,26 +52,17 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
private int currentHandler = HOT;
private final int topLevelOfHandler;
- private long hotReadBlockNum = 0L;
- private long warmReadBlockNum = 0L;
- private long coldReadBlockNum = 0L;
- private long frozenReadBlockNum = 0L;
+ private ClientReadHandlerMetric hostHandlerMetric = new
ClientReadHandlerMetric();
+ private ClientReadHandlerMetric warmHandlerMetric = new
ClientReadHandlerMetric();
+ private ClientReadHandlerMetric coldHandlerMetric = new
ClientReadHandlerMetric();
+ private ClientReadHandlerMetric frozenHandlerMetric = new
ClientReadHandlerMetric();
- private long hotReadLength = 0L;
- private long warmReadLength = 0L;
- private long coldReadLength = 0L;
- private long frozenReadLength = 0L;
-
- private long hotReadUncompressLength = 0L;
- private long warmReadUncompressLength = 0L;
- private long coldReadUncompressLength = 0L;
- private long frozenReadUncompressLength = 0L;
-
- public ComposedClientReadHandler(ClientReadHandler... handlers) {
- this(Lists.newArrayList(handlers));
+ public ComposedClientReadHandler(ShuffleServerInfo serverInfo,
ClientReadHandler... handlers) {
+ this(serverInfo, Lists.newArrayList(handlers));
}
- public ComposedClientReadHandler(List<ClientReadHandler> handlers) {
+ public ComposedClientReadHandler(ShuffleServerInfo serverInfo,
List<ClientReadHandler> handlers) {
+ this.serverInfo = serverInfo;
topLevelOfHandler = handlers.size();
if (topLevelOfHandler > 0) {
this.hotDataReadHandler = handlers.get(0);
@@ -162,30 +156,23 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
}
@Override
- public void updateConsumedBlockInfo(BufferSegment bs) {
+ public void updateConsumedBlockInfo(BufferSegment bs, boolean
isSkippedMetrics) {
if (bs == null) {
return;
}
+ super.updateConsumedBlockInfo(bs, isSkippedMetrics);
switch (currentHandler) {
case HOT:
- hotReadBlockNum++;
- hotReadLength += bs.getLength();
- hotReadUncompressLength += bs.getUncompressLength();
+ updateBlockMetric(hostHandlerMetric, bs, isSkippedMetrics);
break;
case WARM:
- warmReadBlockNum++;
- warmReadLength += bs.getLength();
- warmReadUncompressLength += bs.getUncompressLength();
+ updateBlockMetric(warmHandlerMetric, bs, isSkippedMetrics);
break;
case COLD:
- coldReadBlockNum++;
- coldReadLength += bs.getLength();
- coldReadUncompressLength += bs.getUncompressLength();
+ updateBlockMetric(coldHandlerMetric, bs, isSkippedMetrics);
break;
case FROZEN:
- frozenReadBlockNum++;
- frozenReadLength += bs.getLength();
- frozenReadUncompressLength += bs.getUncompressLength();
+ updateBlockMetric(frozenHandlerMetric, bs, isSkippedMetrics);
break;
default:
break;
@@ -201,31 +188,44 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
@VisibleForTesting
public String getReadBlokNumInfo() {
- long totalBlockNum = hotReadBlockNum + warmReadBlockNum
- + coldReadBlockNum + frozenReadBlockNum;
- return "Client read " + totalBlockNum + " blocks ["
- + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
- + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
+ return "Client read " + readHandlerMetric.getReadBlockNum()
+ + " blocks from [" + serverInfo + "], Consumed["
+ + " hot:" + hostHandlerMetric.getReadBlockNum()
+ + " warm:" + warmHandlerMetric.getReadBlockNum()
+ + " cold:" + coldHandlerMetric.getReadBlockNum()
+ + " frozen:" + frozenHandlerMetric.getReadBlockNum()
+ + " ], Skipped[" + " hot:" + hostHandlerMetric.getSkippedReadBlockNum()
+ + " warm:" + warmHandlerMetric.getSkippedReadBlockNum()
+ + " cold:" + coldHandlerMetric.getSkippedReadBlockNum()
+ + " frozen:" + frozenHandlerMetric.getSkippedReadBlockNum() + " ]";
}
@VisibleForTesting
public String getReadLengthInfo() {
- long totalReadLength = hotReadLength + warmReadLength
- + coldReadLength + frozenReadLength;
- return "Client read " + totalReadLength + " bytes ["
- + " hot:" + hotReadLength + " warm:" + warmReadLength
- + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
+ return "Client read " + readHandlerMetric.getReadLength()
+ + " bytes from [" + serverInfo + "], Consumed["
+ + " hot:" + hostHandlerMetric.getReadLength()
+ + " warm:" + warmHandlerMetric.getReadLength()
+ + " cold:" + coldHandlerMetric.getReadLength()
+ + " frozen:" + frozenHandlerMetric.getReadLength() + " ], Skipped["
+ + " hot:" + hostHandlerMetric.getSkippedReadLength()
+ + " warm:" + warmHandlerMetric.getSkippedReadLength()
+ + " cold:" + coldHandlerMetric.getSkippedReadLength()
+ + " frozen:" + frozenHandlerMetric.getSkippedReadLength() + " ]";
}
@VisibleForTesting
public String getReadUncompressLengthInfo() {
- long totalReadUncompressLength = hotReadUncompressLength +
warmReadUncompressLength
- + coldReadUncompressLength + frozenReadUncompressLength;
- return "Client read " + totalReadUncompressLength + " uncompressed bytes ["
- + " hot:" + hotReadUncompressLength
- + " warm:" + warmReadUncompressLength
- + " cold:" + coldReadUncompressLength
- + " frozen:" + frozenReadUncompressLength + " ]";
+ return "Client read " + readHandlerMetric.getReadUncompressLength()
+ + " uncompressed bytes from [" + serverInfo + "], Consumed["
+ + " hot:" + hostHandlerMetric.getReadUncompressLength()
+ + " warm:" + warmHandlerMetric.getReadUncompressLength()
+ + " cold:" + coldHandlerMetric.getReadUncompressLength()
+ + " frozen:" + frozenHandlerMetric.getReadUncompressLength() + " ],
Skipped["
+ + " hot:" + hostHandlerMetric.getSkippedReadUncompressLength()
+ + " warm:" + warmHandlerMetric.getSkippedReadUncompressLength()
+ + " cold:" + coldHandlerMetric.getSkippedReadUncompressLength()
+ + " frozen:" + frozenHandlerMetric.getSkippedReadUncompressLength() +
" ]";
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 1001c12c..59ddd9c5 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -29,7 +29,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
@@ -50,11 +49,6 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
protected final Configuration hadoopConf;
protected final List<HdfsShuffleReadHandler> readHandlers =
Lists.newArrayList();
private int readHandlerIndex;
-
- private long readBlockNum = 0L;
- private long readLength = 0L;
- private long readUncompressLength = 0L;
-
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
@@ -196,20 +190,4 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
protected int getReadHandlerIndex() {
return readHandlerIndex;
}
-
- @Override
- public void updateConsumedBlockInfo(BufferSegment bs) {
- if (bs == null) {
- return;
- }
- readBlockNum++;
- readLength += bs.getLength();
- readUncompressLength += bs.getUncompressLength();
- }
-
- @Override
- public void logConsumedBlockInfo() {
- LOG.info("Client read " + readBlockNum + " blocks,"
- + " bytes:" + readLength + " uncompressed bytes:" +
readUncompressLength);
- }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
index 83b9fb7f..c2e766a9 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -36,10 +36,6 @@ public class MultiReplicaClientReadHandler extends
AbstractClientReadHandler {
private final List<ClientReadHandler> handlers;
private final List<ShuffleServerInfo> shuffleServerInfos;
-
- private long readBlockNum = 0L;
- private long readLength = 0L;
- private long readUncompressLength = 0L;
private final Roaring64NavigableMap blockIdBitmap;
private final Roaring64NavigableMap processedBlockIds;
@@ -87,18 +83,14 @@ public class MultiReplicaClientReadHandler extends
AbstractClientReadHandler {
}
@Override
- public void updateConsumedBlockInfo(BufferSegment bs) {
- if (bs == null) {
- return;
- }
- readBlockNum++;
- readLength += bs.getLength();
- readUncompressLength += bs.getUncompressLength();
+ public void updateConsumedBlockInfo(BufferSegment bs, boolean
isSkippedMetrics) {
+ super.updateConsumedBlockInfo(bs, isSkippedMetrics);
+ handlers.get(Math.max(readHandlerIndex, handlers.size() -
1)).updateConsumedBlockInfo(bs, isSkippedMetrics);
}
@Override
public void logConsumedBlockInfo() {
- LOG.info("Client read " + readBlockNum + " blocks,"
- + " bytes:" + readLength + " uncompressed bytes:" +
readUncompressLength);
+ super.logConsumedBlockInfo();
+ handlers.forEach(ClientReadHandler::logConsumedBlockInfo);
}
}