[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r584028482 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: The initial fix that was proposed was to create the member `blockIds` in OneForOneBlockFetcher in accordance with the FetchShuffleBlocks message which should work even if blockIds are random. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r584024039 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: > the order of blockIds could be random Well, this PR is adding the validation where it checks the mapIds in `blockIds` are in increasing order. So the order of `blockIds` cannot be random. `assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1));` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r584021590 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: My point is that the order of blockIds has been irrelevant for `OneForOneBlockFetcher` so far. At this layer, it doesn't have any dependency on the order of block Ids so why to introduce that at this layer now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583802260 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4])); } } -long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet()); +long[] mapIds = Longs.toArray(orderedMapId); Review comment: ok. In that case, we could recompute the member `blockIds` to be in sync with `FetchShuffleBlocks`. I think that was the initial fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583802260 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4])); } } -long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet()); +long[] mapIds = Longs.toArray(orderedMapId); Review comment: ok. In that case, we could recompute the member `blockIds` from the `FetchShuffleBlocks`. I think that was the initial fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583791753 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4])); } } -long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet()); +long[] mapIds = Longs.toArray(orderedMapId); Review comment: That's right so we can not just use LinkedHashMap and maintain a separate arrayList of mapIds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583790528 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: I actually don't think that the bug reveals that order in blockIds is important. Just in `OneForOneBlockFetcher`, it never expects the provided `blockIds` are in certain order. Also at this layer why should it care about the order of blockIds? It is given an array of blockIds which may be in any order and would fetch them. IMO, the bug is just that the member `blockIds` of OneForOneBlockFetcher is out of sync with the FetchShuffleBlocks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583791753 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4])); } } -long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet()); +long[] mapIds = Longs.toArray(orderedMapId); Review comment: That's right so we can not just use LinkedHashMap and maintain a separate arrayList of mapIds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583790528 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: I actually don't think that the bug reveals that order in blockIds is important. Just in `OneForOneBlockFetcher`, it never expects the provided `blockIds` are in certain order. Also at this layer why should it expect that? It is given an array of blockIds which may be in any order and would fetch them. IMO, the bug is just that the member `blockIds` of OneForOneBlockFetcher is out of sync with the FetchShuffleBlocks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a change in pull request #31643: [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks
otterc commented on a change in pull request #31643: URL: https://github.com/apache/spark/pull/31643#discussion_r583382485 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); Review comment: Why is this needed? `mapId >= orderedMapId.get(orderedMapId.size() - 1)` is validating that the `blockIds` are in increasing order of mapIds but this validation didn't exist earlier so why is this being done now in reference to this bug? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); for (String blockId : blockIds) { String[] blockIdParts = splitBlockId(blockId); if (Integer.parseInt(blockIdParts[1]) != shuffleId) { throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + ", got:" + blockId); } long mapId = Long.parseLong(blockIdParts[2]); + assert(orderedMapId.isEmpty() || mapId >= orderedMapId.get(orderedMapId.size() - 1)); if (!mapIdToReduceIds.containsKey(mapId)) { mapIdToReduceIds.put(mapId, new ArrayList<>()); +orderedMapId.add(mapId); } - mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[3])); + ArrayList reduceIdsByMapId = mapIdToReduceIds.get(mapId); + int reduceId = Integer.parseInt(blockIdParts[3]); + assert(reduceIdsByMapId.isEmpty() || reduceId > reduceIdsByMapId.get(reduceIdsByMapId.size() - 1)); Review comment: Same here, why is this validation `reduceId > reduceIdsByMapId.get(reduceIdsByMapId.size() - 1))` needed now in reference to this bug? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -136,11 +142,12 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( mapIdToReduceIds.get(mapId).add(Integer.parseInt(blockIdParts[4])); } } -long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet()); +long[] mapIds = Longs.toArray(orderedMapId); Review comment: If we use a LinkedHashMap for `mapIdToReduceIds`, then `mapIdToReduceIds.keySet` will return the mapIds in the same order as they were inserted. So the order will be same as blockIds. ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ## @@ -117,17 +117,23 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg( boolean batchFetchEnabled = firstBlock.length == 5; HashMap> mapIdToReduceIds = new HashMap<>(); +ArrayList orderedMapId = new ArrayList<>(); Review comment: Why can't we just change `mapIdToReduceIds` to a LinkedHashMap? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org