Ngone51 commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494290466



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {

Review comment:
       The `RoaringBitmap` also has `serialize(ByteBuffer buffer)` API. Why not 
use it?

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());

Review comment:
       Does this call `serializedSizeInBytes` assumes that `trim` & 
`runOptimize` already called? If so, please add comment to clarify it.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.

Review comment:
       By looking at BlockPushCallback, seems like it's also possible that 
`PushBlockStream` sent from the remote shuffle service to the pusher? 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -43,6 +48,9 @@
  * (via BlockTransferService), which has the downside of losing the data if we 
lose the executors.
  */
 public class ExternalBlockStoreClient extends BlockStoreClient {
+  private static final Logger logger = 
LoggerFactory.getLogger(ExternalBlockStoreClient.class);

Review comment:
       The abstract class `BlockStoreClient` already initialized a `logger`. 
Why need a new one?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Result returned by an ExternalShuffleService to a scheduler. This 
represents the result

Review comment:
       nit: `to a scheduler` -> `to the DAGScheduler`?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Request to notify external shuffle service to finalize merge for a given 
shuffle.

Review comment:
       Maybe "Request from DAGScheduler to"? 
   
   Since this is only a part of the implementation and no one sends it, I think 
it would be good for reviewers to know who's the sender.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -156,6 +193,20 @@ protected void handleMessage(
       Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId, 
msg.execIds);
       callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
 
+    } else if (msgObj instanceof FinalizeShuffleMerge) {
+      final Timer.Context responseDelayContext =
+          metrics.finalizeShuffleMergeLatencyMillis.time();
+      FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
+      try {
+        checkAuth(client, msg.appId);
+        MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
+        callback.onSuccess(statuses.toByteBuffer());
+      } catch(IOException e) {
+        throw new RuntimeException(String.format("Error while finalizing 
shuffle merge "
+            + "for application %s shuffle %d", msg.appId, msg.shuffleId));

Review comment:
       I think we should respond to DAGScheduler with an error instead of 
throwing an exception here so that the finalizing future at DAGScheduler can 
complete quickly.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.
+ */
+public class PushBlockStream extends BlockTransferMessage {
+  public final String appId;
+  public final String blockId;
+  public final int index;

Review comment:
       What does `index` mean here? Could you add some comments to explain?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -373,6 +427,54 @@ public ManagedBuffer next() {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when 
push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements 
MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {

Review comment:
       Why we need `user` here? Is it a system user or something else?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to