[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r584028482



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   The initial fix that was proposed was to create the member `blockIds` in 
OneForOneBlockFetcher in accordance with the FetchShuffleBlocks message which 
should work even if blockIds are random.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r584024039



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   > the order of blockIds could be random
   
   Well, this PR is adding the validation where it checks the mapIds in 
`blockIds` are in increasing order.  So the order of `blockIds` cannot be 
random. 
   `assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r584021590



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   My point is that the order of blockIds has been irrelevant for 
`OneForOneBlockFetcher` so far. At this layer, it doesn't have any dependency 
on the order of block Ids so why to introduce that at this layer now?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583802260



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
   }
 }
-long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
+long[] mapIds = Longs.toArray(orderedMapId);

Review comment:
   ok. In that case, we could recompute the member `blockIds` to be in sync 
with `FetchShuffleBlocks`. I think that was the initial fix.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583802260



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
   }
 }
-long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
+long[] mapIds = Longs.toArray(orderedMapId);

Review comment:
   ok. In that case, we could recompute the member `blockIds`  from the 
`FetchShuffleBlocks`. I think that was the initial fix.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583791753



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
   }
 }
-long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
+long[] mapIds = Longs.toArray(orderedMapId);

Review comment:
   That's right so we can not just use LinkedHashMap and maintain a 
separate arrayList of mapIds.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583790528



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   I actually don't think that the bug reveals that order in blockIds is 
important. Just in `OneForOneBlockFetcher`, it never expects the provided 
`blockIds` are in certain order. Also at this layer why should it care about 
the order of blockIds?
   It is given an array of blockIds which may be in any order and would fetch 
them. 
   IMO, the bug is just that the  member `blockIds` of OneForOneBlockFetcher is 
out of sync with the FetchShuffleBlocks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583791753



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
   }
 }
-long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
+long[] mapIds = Longs.toArray(orderedMapId);

Review comment:
   That's right so we can not just use LinkedHashMap and maintain a 
separate arrayList of mapIds.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-26 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583790528



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   I actually don't think that the bug reveals that order in blockIds is 
important. Just in `OneForOneBlockFetcher`, it never expects the provided 
`blockIds` are in certain order. Also at this layer why should it expect that?
   It is given an array of blockIds which may be in any order and would fetch 
them. 
   IMO, the bug is just that the  member `blockIds` of OneForOneBlockFetcher is 
out of sync with the FetchShuffleBlocks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

2021-02-25 Thread GitBox


otterc commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r583382485



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));

Review comment:
   Why is this needed? `mapId >= orderedMapId.get(orderedMapId.size() - 1)` 
is validating that the `blockIds` are in increasing order of mapIds but this 
validation didn't exist earlier so why is this being done now in reference to 
this bug?

##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();
 for (String blockId : blockIds) {
   String[] blockIdParts = splitBlockId(blockId);
   if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
 throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
   ", got:" + blockId);
   }
   long mapId = Long.parseLong(blockIdParts[2]);
+  assert(orderedMapId.isEmpty() || mapId >= 
orderedMapId.get(orderedMapId.size() - 1));
   if (!mapIdToReduceIds.containsKey(mapId)) {
 mapIdToReduceIds.put(mapId, new ArrayList<>());
+orderedMapId.add(mapId);
   }
-  mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[3]));
+  ArrayList reduceIdsByMapId = mapIdToReduceIds.get(mapId);
+  int reduceId = Integer.parseInt(blockIdParts[3]);
+  assert(reduceIdsByMapId.isEmpty() || reduceId > 
reduceIdsByMapId.get(reduceIdsByMapId.size() - 1));

Review comment:
   Same here, why is this validation `reduceId > 
reduceIdsByMapId.get(reduceIdsByMapId.size() - 1))` needed now in reference to 
this bug?

##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4]));
   }
 }
-long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
+long[] mapIds = Longs.toArray(orderedMapId);

Review comment:
   If we use a LinkedHashMap for `mapIdToReduceIds`, then 
`mapIdToReduceIds.keySet` will return the mapIds in the same order as they were 
inserted. So the order will be same as blockIds.

##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##
@@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
 boolean batchFetchEnabled = firstBlock.length == 5;
 
 HashMap> mapIdToReduceIds = new HashMap<>();
+ArrayList orderedMapId = new ArrayList<>();

Review comment:
   Why can't we just change `mapIdToReduceIds` to a LinkedHashMap?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org