This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4465a9229 [CELEBORN-1048][FOLLOWUP] MR module compile
4465a9229 is described below
commit 4465a9229b94ab683b8fb5b6e4e13aafd5996b38
Author: sychen <[email protected]>
AuthorDate: Sat Nov 4 20:21:47 2023 +0800
[CELEBORN-1048][FOLLOWUP] MR module compile
### What changes were proposed in this pull request?
Let the MR module compile successfully.
### Why are the changes needed?
#2000 added parameters in the `ShuffleClient#readPartition` method,
resulting in MR module compilation failure.
MR CI is still missing.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
local test
```bash
./build/make-distribution.sh -Pmr
```
Closes #2069 from cxzl25/CELEBORN-1048-FOLLOWUP.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../mapreduce/task/reduce/CelebornShuffleConsumer.java | 17 ++++++++++++++++-
.../java/org/apache/celeborn/client/ShuffleClient.java | 1 +
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
index b3292b600..ee48e3a30 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.client.read.MetricsCallback;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.reflect.DynConstructors;
@@ -129,9 +130,23 @@ public class CelebornShuffleConsumer<K, V>
reduceId.getTaskID().getId(),
reduceId.getId());
+ MetricsCallback metricsCallback =
+ new MetricsCallback() {
+ @Override
+ public void incBytesRead(long bytesRead) {}
+
+ @Override
+ public void incReadTime(long time) {}
+ };
+
CelebornInputStream shuffleInputStream =
shuffleClient.readPartition(
- 0, reduceId.getTaskID().getId(), reduceId.getId(), 0,
Integer.MAX_VALUE);
+ 0,
+ reduceId.getTaskID().getId(),
+ reduceId.getId(),
+ 0,
+ Integer.MAX_VALUE,
+ metricsCallback);
CelebornShuffleFetcher<K, V> shuffleReader =
new CelebornShuffleFetcher(
reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleInputStream);
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 22318e542..b4b566d1a 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -188,6 +188,7 @@ public abstract class ShuffleClient {
* to read all partition data
* @param endMapIndex the index of end map index of interested map range,
set to
* `Integer.MAX_VALUE` if you want to read all partition data
+ * @param metricsCallback callback to report metrics
* @return
* @throws IOException
*/