[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-13 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1070230669


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -366,8 +359,35 @@ class BlockManagerMasterEndpoint(
 }
   }.getOrElse(Seq.empty)
 
+val removeShuffleMergeFromShuffleServicesFutures =
+  externalBlockStoreClient.map { shuffleClient =>
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }
+mergerLocations.map { bmId =>
+  Future[Boolean] {
+shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+  RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)
+  }
+}
+  }.getOrElse(Seq.empty)
+
+val removeMsg = RemoveShuffle(shuffleId)
+val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+// use false as default value means no shuffle data were removed
+handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+  }
+}.toSeq
+if (testing) {
+  
RpcUtils.INFINITE_TIMEOUT.awaitResult(Future.sequence(removeShuffleFromExecutorsFutures))

Review Comment:
   This is from my branch - so my fault actually :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-13 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1070212293


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -366,8 +359,35 @@ class BlockManagerMasterEndpoint(
 }
   }.getOrElse(Seq.empty)
 
+val removeShuffleMergeFromShuffleServicesFutures =
+  externalBlockStoreClient.map { shuffleClient =>
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }
+mergerLocations.map { bmId =>
+  Future[Boolean] {
+shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+  RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)
+  }
+}
+  }.getOrElse(Seq.empty)
+
+val removeMsg = RemoveShuffle(shuffleId)
+val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+// use false as default value means no shuffle data were removed
+handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+  }
+}.toSeq
+if (testing) {
+  
RpcUtils.INFINITE_TIMEOUT.awaitResult(Future.sequence(removeShuffleFromExecutorsFutures))

Review Comment:
   I think this is probably causing the test failures ?
   Since we dont need it for this codebase anymore, we can remove it - (the 
`testing` variable and this block here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-13 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1069857650


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,23 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)
+  .toByteBuffer());
+  // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC.
+} catch (Exception e) {
+  logger.error("Exception while sending RemoveShuffleMerge request to 
{}:{}",

Review Comment:
   We can revisit that in 
[SPARK-42025](https://issues.apache.org/jira/browse/SPARK-42025).
   Here, let us simply make the `logger.error` in the `catch` to `logger.debug` 
for now - since it is this is a best case effort to remove files from an 
external service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068956139


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,23 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)
+  .toByteBuffer());
+  // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC.
+} catch (Exception e) {
+  logger.error("Exception while sending RemoveShuffleMerge request to 
{}:{}",

Review Comment:
   Let us make it info in this pr - we can revisit it as part of SPARK-42025



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068472060


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Can you also add a `TODO` here (specufying the jira) ? Thx



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Can you also add a `TODO` here (specifying the jira) ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068470671


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +530,40 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+  AppShuffleInfo appShuffleInfo,
+  int[] reduceIds,
+  boolean deleteFromDB) {
+if(deleteFromDB) {
+  removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+}
+int shuffleId = appAttemptShuffleMergeId.shuffleId;
+int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+int dataFilesDeleteCnt = 0;
+int indexFilesDeleteCnt = 0;
+int metaFilesDeleteCnt = 0;
+for (int reduceId : reduceIds) {
+  File dataFile =
+  appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+  if (dataFile.delete()) {

Review Comment:
   Delete wont throw an exception - only return `false` if delete failed (I am 
ignoring `SecurityException` here, since that is not relevant in this context).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1068018231


##
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala:
##
@@ -913,6 +918,59 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 slaveRpcEnv.shutdown()
   }
 
+  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
+val newConf = new SparkConf
+newConf.set("spark.shuffle.push.enabled", "true")
+newConf.set("spark.shuffle.service.enabled", "true")
+newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+newConf.set(IS_TESTING, true)
+
+val SHUFFLE_ID = 10
+// needs TorrentBroadcast so need a SparkContext
+withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
+  val rpcEnv = sc.env.rpcEnv
+  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
+  val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
+  val blockManagerMasterEndpoint = new BlockManagerMasterEndpoint(
+rpcEnv,
+sc.isLocal,
+sc.conf,
+sc.listenerBus,
+Some(blockStoreClient),
+// We dont care about this ...
+new concurrent.TrieMap[BlockManagerId, BlockManagerInfo](),
+masterTracker,
+sc.env.shuffleManager,
+true
+  )
+  rpcEnv.stop(sc.env.blockManager.master.driverEndpoint)
+  sc.env.blockManager.master.driverEndpoint =
+rpcEnv.setupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,
+  blockManagerMasterEndpoint)
+
+  masterTracker.registerShuffle(SHUFFLE_ID, 10, 10)
+  val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", 
s"host-$x", x))
+  masterTracker.registerShufflePushMergerLocations(SHUFFLE_ID, mergerLocs)
+
+  
assert(masterTracker.getShufflePushMergerLocations(SHUFFLE_ID).map(_.host).toSet
 ==
+mergerLocs.map(_.host).toSet)
+
+  val foundHosts = JCollections.synchronizedSet(new JHashSet[String]())
+  when(blockStoreClient.removeShuffleMerge(any(), any(), any(), 
any())).thenAnswer(
+(m: InvocationOnMock) => {
+  val host = m.getArgument(0).asInstanceOf[String]
+  val shuffleId = m.getArgument(2).asInstanceOf[Int]
+  assert(shuffleId == SHUFFLE_ID)
+  foundHosts.add(host)
+  true
+})
+
+  sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true)
+  assert(foundHosts.asScala == mergerLocs.map(_.host).toSet)
+}
+  }

Review Comment:
   There is a bug in this test - sorry about that.
   Please take a look here: 
https://github.com/apache/spark/commit/26b2d1897ec2449ad202e61fc815708829837d98



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   (Edited comment, removed incorrect description and removed diffs - that is 
in the link below).
   
   @wankunde, take a look at this - includes the test and also fixes the issue 
you described in rest of the method.
   
https://github.com/apache/spark/commit/26b2d1897ec2449ad202e61fc815708829837d98
   
   Please feel free to adapt it to your needs.
   
   



##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right - since MOT is non-null : 
will need to check more.
   If right, we need to move the block computing 
`removeShuffleFromExecutorsFutures` to the end of the method (that should be 
sufficient) ... currently it is a race condition, and bug need not get 
triggered always
   
   The test I added above is slightly broken - the initial initialization is 
not getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right - since MOT is non-null : 
will need to check more.
   If right, we need to move the block computing 
`removeShuffleFromExecutorsFutures` to the end of the method (that should be 
sufficient) ... currently it is a race condition, and bug need not get 
triggered always
   
   The test I added above is slightly broken - the initial initialization is 
not getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right - since MOT is non-null : 
will need to check more.
   If right, we need to move the block computing 
`removeShuffleFromExecutorsFutures` to the end of the method (that should be 
sufficient) ... currently it is a race condition, and bug need not get 
triggered always
   
   The test above is slightly broken - the initial initialization is not 
getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right - since MOT is non-null : 
will need to check more.
   If right, we need to move `removeShuffleFromExecutorsFutures` to the end of 
the method (that should be sufficient) ... currently it is a race condition, 
and bug need not get triggered always
   
   The test above is slightly broken - the initial initialization is not 
getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right : will need to check more.
   If right, we need to move `removeShuffleFromExecutorsFutures` to the end of 
the method (that should be sufficient) ... currently it is a race condition, 
and bug need not get triggered always
   
   The test above is slightly broken - the initial initialization is not 
getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right : will need to check more.
   If right, we need to move `removeShuffleFromExecutorsFutures` to the end of 
the method (that should be sufficient)
   
   The test above is slightly broken - the initial initialization is not 
getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067973800


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   Thinking more, I think your assessment is right : will need to check more.
   If right, we need to the first `bm.storageEndpoint` to the end of the method.
   
   The test above is slightly broken - the initial initialization is not 
getting handled (when driver would be registered).
   A bit late for me to try to fix it :-)
   
   Can you take a look at it @wankunde ? We can fix the test and add to the PR ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   `RemoveShuffle` is sent to the storage endpoint in `removeShuffle` - not to 
the block manager.
   
   You can test it with this diff:
   
   ```
   diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   index 9f03228bb4f..e04ef7f11db 100644
   --- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   +++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   @@ -321,12 +321,6 @@ class BlockManagerMasterEndpoint(
  }

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
   -val mergerLocations =
   -  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   -mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   -  } else {
   -Seq.empty[BlockManagerId]
   -  }
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { 
bm =>
  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   @@ -374,6 +368,13 @@ class BlockManagerMasterEndpoint(

val removeShuffleMergeFromShuffleServicesFutures =
  externalBlockStoreClient.map { shuffleClient =>
   +val mergerLocations = {
   +  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   +mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   +  } else {
   +Seq.empty[BlockManagerId]
   +  }
   +}
mergerLocations.map { bmId =>
  Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, 
shuffleId,
   diff --git 
a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   index a13527f4b74..3205b0ba589 100644
   --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   @@ -17,12 +17,16 @@

package org.apache.spark

   +import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder

   +import scala.collection.JavaConverters._
   +import scala.collection.concurrent
import scala.collection.mutable.ArrayBuffer

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
   +import org.mockito.invocation.InvocationOnMock
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.LocalSparkContext._
   @@ -30,10 +34,11 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
   +import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
   -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, 
BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, 
ShuffleMergedBlockId}

class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
  private val conf = new SparkConf
   @@ -913,9 +918,64 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
slaveRpcEnv.shutdown()
  }

   +  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
   +
   +val newConf = new SparkConf
   +newConf.set("spark.shuffle.push.enabled", "true")
   +newConf.set("spark.shuffle.service.enabled", "true")
   +newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
   +newConf.set(IS_TESTING, true)
   +
   +val SHUFFLE_ID = 10
   +
   +// needs TorrentBroadcast so need a SparkContext
   +withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) 
{ sc =>
   +  val rpcEnv = sc.env.rpcEnv
   +  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
   +
   +  val 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   `RemoveShuffle` is sent to the storage endpoint - not to the block manager.
   
   You can test it with this diff:
   
   ```
   diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   index 9f03228bb4f..e04ef7f11db 100644
   --- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   +++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   @@ -321,12 +321,6 @@ class BlockManagerMasterEndpoint(
  }

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
   -val mergerLocations =
   -  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   -mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   -  } else {
   -Seq.empty[BlockManagerId]
   -  }
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { 
bm =>
  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   @@ -374,6 +368,13 @@ class BlockManagerMasterEndpoint(

val removeShuffleMergeFromShuffleServicesFutures =
  externalBlockStoreClient.map { shuffleClient =>
   +val mergerLocations = {
   +  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   +mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   +  } else {
   +Seq.empty[BlockManagerId]
   +  }
   +}
mergerLocations.map { bmId =>
  Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, 
shuffleId,
   diff --git 
a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   index a13527f4b74..3205b0ba589 100644
   --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   @@ -17,12 +17,16 @@

package org.apache.spark

   +import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder

   +import scala.collection.JavaConverters._
   +import scala.collection.concurrent
import scala.collection.mutable.ArrayBuffer

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
   +import org.mockito.invocation.InvocationOnMock
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.LocalSparkContext._
   @@ -30,10 +34,11 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
   +import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
   -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, 
BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, 
ShuffleMergedBlockId}

class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
  private val conf = new SparkConf
   @@ -913,9 +918,64 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
slaveRpcEnv.shutdown()
  }

   +  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
   +
   +val newConf = new SparkConf
   +newConf.set("spark.shuffle.push.enabled", "true")
   +newConf.set("spark.shuffle.service.enabled", "true")
   +newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
   +newConf.set(IS_TESTING, true)
   +
   +val SHUFFLE_ID = 10
   +
   +// needs TorrentBroadcast so need a SparkContext
   +withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) 
{ sc =>
   +  val rpcEnv = sc.env.rpcEnv
   +  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
   +
   +  val blockStoreClient = 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   `RemoveShuffle` is sent to the storage endpoint - not to the block manager.
   
   You can test it with this diff:
   
   ```
   diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   index 9f03228bb4f..e04ef7f11db 100644
   --- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   +++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   @@ -321,12 +321,6 @@ class BlockManagerMasterEndpoint(
  }

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
   -val mergerLocations =
   -  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   -mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   -  } else {
   -Seq.empty[BlockManagerId]
   -  }
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { 
bm =>
  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   @@ -374,6 +368,13 @@ class BlockManagerMasterEndpoint(

val removeShuffleMergeFromShuffleServicesFutures =
  externalBlockStoreClient.map { shuffleClient =>
   +val mergerLocations = {
   +  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   +mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   +  } else {
   +Seq.empty[BlockManagerId]
   +  }
   +}
mergerLocations.map { bmId =>
  Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, 
shuffleId,
   diff --git 
a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   index a13527f4b74..fb9dc8ff29b 100644
   --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   @@ -17,9 +17,12 @@

package org.apache.spark

   +import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder

   +import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
   +import scala.collection.concurrent

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
   @@ -30,10 +33,12 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
   +import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
   -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, 
BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.mockito.invocation.InvocationOnMock

class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
  private val conf = new SparkConf
   @@ -913,9 +918,64 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
slaveRpcEnv.shutdown()
  }

   +  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
   +
   +val newConf = new SparkConf
   +newConf.set("spark.shuffle.push.enabled", "true")
   +newConf.set("spark.shuffle.service.enabled", "true")
   +newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
   +newConf.set(IS_TESTING, true)
   +
   +val SHUFFLE_ID = 10
   +
   +// needs TorrentBroadcast so need a SparkContext
   +withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) 
{ sc =>
   +  val rpcEnv = sc.env.rpcEnv
   +  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
   +
   +  val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
   +  val blockManagerMasterEndpoint = new 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   `RemoveShuffle` is sent to the storage endpoint - not to the block manager.
   
   You can test it with this diff:
   
   ```
   diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   index 9f03228bb4f..e04ef7f11db 100644
   --- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   +++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   @@ -321,12 +321,6 @@ class BlockManagerMasterEndpoint(
  }

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
   -val mergerLocations =
   -  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   -mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   -  } else {
   -Seq.empty[BlockManagerId]
   -  }
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { 
bm =>
  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   @@ -374,6 +368,13 @@ class BlockManagerMasterEndpoint(

val removeShuffleMergeFromShuffleServicesFutures =
  externalBlockStoreClient.map { shuffleClient =>
   +val mergerLocations = {
   +  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   +mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   +  } else {
   +Seq.empty[BlockManagerId]
   +  }
   +}
mergerLocations.map { bmId =>
  Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, 
shuffleId,
   diff --git 
a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   index a13527f4b74..fb9dc8ff29b 100644
   --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   @@ -17,9 +17,12 @@

package org.apache.spark

   +import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder

   +import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
   +import scala.collection.concurrent

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
   @@ -30,10 +33,12 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
   +import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
   -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, 
BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.mockito.invocation.InvocationOnMock

class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
  private val conf = new SparkConf
   @@ -913,9 +918,64 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
slaveRpcEnv.shutdown()
  }

   +  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
   +
   +val newConf = new SparkConf
   +newConf.set("spark.shuffle.push.enabled", "true")
   +newConf.set("spark.shuffle.service.enabled", "true")
   +newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
   +newConf.set(IS_TESTING, true)
   +
   +val SHUFFLE_ID = 10
   +
   +// needs TorrentBroadcast so need a SparkContext
   +withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) 
{ sc =>
   +  val rpcEnv = sc.env.rpcEnv
   +  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
   +
   +  val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
   +  val blockManagerMasterEndpoint = new 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-12 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067941354


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   `RemoveShuffle` is sent to the storage endpoint - not to the block manager.
   
   You can test it with this diff:
   
   ```
   diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   index 9f03228bb4f..e04ef7f11db 100644
   --- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   +++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
   @@ -321,12 +321,6 @@ class BlockManagerMasterEndpoint(
  }

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
   -val mergerLocations =
   -  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   -mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   -  } else {
   -Seq.empty[BlockManagerId]
   -  }
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { 
bm =>
  bm.storageEndpoint.ask[Boolean](removeMsg).recover {
   @@ -374,6 +368,13 @@ class BlockManagerMasterEndpoint(

val removeShuffleMergeFromShuffleServicesFutures =
  externalBlockStoreClient.map { shuffleClient =>
   +val mergerLocations = {
   +  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
   +mapOutputTracker.getShufflePushMergerLocations(shuffleId)
   +  } else {
   +Seq.empty[BlockManagerId]
   +  }
   +}
mergerLocations.map { bmId =>
  Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, 
shuffleId,
   diff --git a/core/src/test/resources/log4j2.properties 
b/core/src/test/resources/log4j2.properties
   index ab02104c696..b906523789d 100644
   --- a/core/src/test/resources/log4j2.properties
   +++ b/core/src/test/resources/log4j2.properties
   @@ -16,7 +16,7 @@
#

# Set everything to be logged to the file target/unit-tests.log
   -rootLogger.level = info
   +rootLogger.level = debug
rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}

appender.file.type = File
   diff --git 
a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   index a13527f4b74..fb9dc8ff29b 100644
   --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
   @@ -17,9 +17,12 @@

package org.apache.spark

   +import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder

   +import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
   +import scala.collection.concurrent

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
   @@ -30,10 +33,12 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
   +import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
   -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, 
BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, 
ShuffleMergedBlockId}
   +import org.mockito.invocation.InvocationOnMock

class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
  private val conf = new SparkConf
   @@ -913,9 +918,64 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
slaveRpcEnv.shutdown()
  }

   +  test("SPARK-40480: shuffle remove should cleanup merged files as well") {
   +
   +val newConf = new SparkConf
   +newConf.set("spark.shuffle.push.enabled", "true")
   +newConf.set("spark.shuffle.service.enabled", "true")
   +newConf.set(SERIALIZER, 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067741652


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   To give context, `mapOutputTrackerMaster.unregisterShuffle` happens after 
`shuffleDriverComponents.removeShuffle` - and BlockManagerMaster.removeShuffle 
does an inf wait for `RemoveShuffle` to complete.
   So we should have non-empty mergers, since `shuffleStatuses` has not yet 
been cleaned up for the shuffle id.
   
   The `RemoveShuffle` from within `master.removeShuffle` sends `RemoveShuffle` 
to workers - where it is simply cleanup the client side maps.
   
   Let me know if I am missing something !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067733179


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   I am not sure I understood how we are getting empty mergers, can you please 
give details @wankunde ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067637314


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   @wankunde, can you file a follow up jira for this ? And add a TODO here for 
that jira pls ?
   We can update it later based on the jira discussion - and unblock this PR 
from it.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   @wankunde, can you file a follow up jira for this ? And add a TODO here for 
that jira here pls ?
   We can update it later based on the jira discussion - and unblock this PR 
from it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067554329


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java:
##
@@ -84,4 +85,9 @@ public MergedBlockMeta getMergedBlockMeta(
   public String[] getMergedBlockDirs(String appId) {
 throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
   }
+
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {

Review Comment:
   This was changed as part of review feedback - `finalizeShuffleMerge` is an 
example where this pattern is already used - instead of pulling out the values 
from the data class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067551485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Failure to remove blocks in `removeBlocks` has an impact on resource 
utilization (for example, block could be in memory). Here, it is more of a best 
effort cleanup - and can fail in context of decommissioned nodes, etc - log 
pollution is something I am concerned with here.
   
   Having said that - the exact same problem exists for 
`removeShuffle.removeShuffleFromShuffleServicesFutures` unfortunately - and 
that is an issue : I would argue for reducing that log message from `warn` to 
`info` (or even `debug` ) in `ExternalBlockStoreClient.removeBlocks` when 
failing to remove shuffle files from ESS.
   
   We can make changes here to log in `info` - and make changes to 
`removeBlocks` for ESS block removal later.
   
   Thoughts ?
   
   +CC @tgravescs who reviewed #35085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067551485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Failure to remove blocks in `removeBlocks` has an impact on resource 
utilization (for example, block could be in memory). Here, it is more of a best 
effort cleanup - and can fail in context of decommissioned nodes, etc - log 
pollution is something I am concerned with here.
   
   Having said that - the exact same problem exists for 
`removeShuffle.removeShuffleFromShuffleServicesFutures` unfortunately - and 
that is an issue : I would argue for reducing that log message from `warn` to 
`info` (or even `debug` ) in `ExternalBlockStoreClient.removeBlocks` when 
failing to remove shuffle files from ESS.
   
   We can make changes here to log in `info` - and make changes to 
`removeBlocks` for ESS block removal later.
   
   Thoughts ?
   
   +CC @tgravescs, @Ngone51  who reviewed #35085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067551485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Failure to remove blocks in `removeBlocks` has an impact on resource 
utilization (for example, block could be in memory). Here, it is more of a best 
case cleanup - and can fail in context of decommissioned nodes, etc - log 
pollution is something I am concerned with here.
   
   Having said that - the exact same problem exists for 
`removeShuffle.removeShuffleFromShuffleServicesFutures` unfortunately - and 
that is an issue : I would argue for reducing that log message from `warn` to 
`info` or even `debug` as well for failure when removing shuffle files from ESS.
   
   We can make changes to here to log in `info` here - and make changes to 
`removeBlocks` for ESS block removal later.
   
   Thoughts ?
   
   +CC @tgravescs who reviewed #35085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067551485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Failure to remove blocks in `removeBlocks` has an impact on resource 
utilization (for example, block could be in memory). Here, it is more of a best 
effort cleanup - and can fail in context of decommissioned nodes, etc - log 
pollution is something I am concerned with here.
   
   Having said that - the exact same problem exists for 
`removeShuffle.removeShuffleFromShuffleServicesFutures` unfortunately - and 
that is an issue : I would argue for reducing that log message from `warn` to 
`info` or even `debug` as well for failure when removing shuffle files from ESS.
   
   We can make changes to here to log in `info` here - and make changes to 
`removeBlocks` for ESS block removal later.
   
   Thoughts ?
   
   +CC @tgravescs who reviewed #35085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067551485


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##
@@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
 }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId, int 
shuffleMergeId) {
+checkInit();
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.send(
+  new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId)

Review Comment:
   Failure to remove blocks in `removeBlocks` has an impact on resource 
utilization (for example, block could be in memory). Here, it is more of a best 
case cleanup - and can fail in context of decommissioned nodes, etc - log 
pollution is something I am concerned with here.
   
   Having said that - the exact same problem exists for 
`removeShuffle.removeShuffleFromShuffleServicesFutures` unfortunately - and 
that is an issue : I would argue for reducing that log message from `warn` to 
`info` or even `debug` as well for failure when removing shuffle files from ESS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-11 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1067548067


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
+val mergerLocations =
+  if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+  } else {
+Seq.empty[BlockManagerId]
+  }

Review Comment:
   I agree with @otterc, I dont see an issue with interleaving modifications 
impacting this - so we should be able to move it closer to where it is used.
   Keeping it closer is preferable - though I would call it a nit, like how 
Chandni has characterized it.
   We can also pull it within the `externalBlockStoreClient.map {` to tighten 
the visibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-10 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066433357


##
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##
@@ -366,8 +372,19 @@ class BlockManagerMasterEndpoint(
 }
   }.getOrElse(Seq.empty)
 
+val removeShuffleMergeFromShuffleServicesFutures =
+  externalBlockStoreClient.map { shuffleClient =>
+mergerLocations.map { bmId =>
+  Future[Boolean] {
+shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+  RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)

Review Comment:
   We already have this dependency - for example, 
`RemoteBlockPushResolver.MERGED_SHUFFLE*` in `BlockId`
   Initially I was going to propose moving this out - but given existing 
(internal) use, it looked fine for now.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-10 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1066428217


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteCurrentMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+  if(deleteCurrentMergedShuffle) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() ->
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+} else {
+  submitCleanupTask(() ->
+deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+mergePartitionsInfo.getReduceIds(), false));
+}
+  } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() ->
+  closeAndDeleteOutdatedPartitions(

Review Comment:
   All file cleanup is async - for example, when a newer merge id is seen 
during finalize, etc.
   The metadata removal is immediate and within the critical section - so the 
files cant be reached once the method returns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2023-01-06 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1063954257


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+int shuffleId = appAttemptShuffleMergeId.shuffleId;
+int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+for (int reduceId : reduceIds) {
+  try {
+File dataFile =
+appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+dataFile.delete();
+  } catch (Exception e) {

Review Comment:
   Can we fix this @wankunde ?
   Essentially, the changes are:
   
   a) There is no exception thrown in this block - we dont need the try/catch.
   b) When the delete fails, we dont need to do the `warn` (here and below) : 
this can happen if application exit is racing against remove shuffle - the 
`info` at the end of the method will suffice.



##
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##
@@ -1316,6 +1318,72 @@ public void 
testJsonSerializationOfPushShufflePartitionInfo() throws IOException
   RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
   }
 
+  @Test
+  public void testRemoveShuffleMerge() throws IOException, 
InterruptedException {
+Semaphore closed = new Semaphore(0);
+String testApp = "testRemoveShuffleMerge";
+RemoteBlockPushResolver pushResolver = new RemoteBlockPushResolver(conf, 
null) {
+  @Override
+  void closeAndDeleteOutdatedPartitions(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+  Map partitions) {
+super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
partitions);
+closed.release();
+  }
+
+  @Override
+  void deleteMergedFiles(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+  AppShuffleInfo appShuffleInfo,
+  int[] reduceIds,
+  boolean deleteFromDB) {
+super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, 
reduceIds, deleteFromDB);
+closed.release();
+  }
+};
+pushResolver.registerExecutor(testApp, new ExecutorShuffleInfo(
+prepareLocalDirs(localDirs, MERGE_DIRECTORY), 1, 
MERGE_DIRECTORY_META));
+
+// 1. Check whether the data is cleaned up when merged shuffle is finalized
+RemoteBlockPushResolver.AppShuffleInfo shuffleInfo =
+pushResolver.validateAndGetAppShuffleInfo(testApp);
+StreamCallbackWithID streamCallback1 = 
pushResolver.receiveBlockDataAsStream(
+new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new 
byte[2]));
+streamCallback1.onComplete(streamCallback1.getID());
+pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 1));
+assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 
0)).exists());
+assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+pushResolver.removeShuffleMerge(
+new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1));
+closed.tryAcquire(10, TimeUnit.SECONDS);
+assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 
0)).exists());
+assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+
+// 2. Check whether the data is cleaned up when merged shuffle is not 
finalized.
+StreamCallbackWithID streamCallback2 = 
pushResolver.receiveBlockDataAsStream(
+new PushBlockStream(testApp, NO_ATTEMPT_ID, 2, 1, 0, 0, 0));
+streamCallback2.onData(streamCallback2.getID(), ByteBuffer.wrap(new 
byte[2]));
+streamCallback2.onComplete(streamCallback2.getID());
+assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
+pushResolver.removeShuffleMerge(
+new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 1));
+closed.tryAcquire(10, TimeUnit.SECONDS);
+assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
+
+// 3. Check whether the data is cleaned up when higher shuffleMergeId 
finalize request comes
+StreamCallbackWithID streamCallback3 = 
pushResolver.receiveBlockDataAsStream(
+new PushBlockStream(testApp, NO_ATTEMPT_ID, 3, 1, 0, 0, 0));
+streamCallback3.onData(streamCallback3.getID(), ByteBuffer.wrap(new 
byte[2]));
+streamCallback3.onComplete(streamCallback3.getID());
+

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059526011


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   I am not sure I understand the concern.
   
   `closeAndDeleteOutdatedPartitions` does two things - 
   * cleanup finalization details from DB.
   * cleanup files from disk.
   
   Both of these can be done lazily.
   We are only keeping `appShuffleInfo.shuffles` consistent with the DB, and 
use that to filter during recovery.
   
   Note - there is a bug in `finalizeShuffleMerge`, as I mentioned earlier 
[here](https://github.com/apache/spark/pull/37922#discussion_r990753031) (see 
the last part of my comment) ... so please keep that in mind when analyzing 
this codepath (SPARK-41792).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059526338


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);

Review Comment:
   Misread the type, my bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-30 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1059526011


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   I am not sure I understand the concern.
   
   `closeAndDeleteOutdatedPartitions` does two things - 
   * cleanup finalization details from DB.
   * cleanup files from disk.
   
   Both of these can be done lazily.
   We are only keeping `appShuffleInfo.shuffles` consistent with the DB, and 
use that to filter during recovery.
   
   Note - there is a bug in `finalizeShuffleMerge`, as I mentioned earlier 
[here](https://github.com/apache/spark/pull/37922#discussion_r990753031) (see 
the last part of my comment) ... so please keep that in mind when analyzing 
this codepath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-27 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1057911122


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =

Review Comment:
   nit: `deleteAllMergedShuffle ` -> `deleteCurrentMergedShuffle`



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+  msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);

Review Comment:
   This is `mergePartitionsInfo` itself - replace 
`currentAppAttemptShuffleMergeId` with it ? (or rename `mergePartitionsInfo` as 
`currentAppAttemptShuffleMergeId` if we are going for clarity ?)
   



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  if (mergePartitionsInfo == null) {
+if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+  return null;
+} else {
+  writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+}
+  }
+  boolean deleteAllMergedShuffle =
+  msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  int shuffleMergeId = msg.shuffleMergeId != 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-21 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1053970574


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {

Review Comment:
   We have to keep track of it even if it is absent - so that a late push does 
not end up creating/updating it.
   [This 
comment](https://github.com/apache/spark/pull/37922#discussion_r990753031) 
sketches the details (note, pls dont use it as is - there is a minor bug when 
writing to db when msg is for -1, since that was not considered in that snippet 
:) )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-21 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1053970574


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {

Review Comment:
   We have to keep track of it even if it is absent - so that a late push does 
not end up creating/updating it.
   [This 
comment](https://github.com/apache/spark/pull/37922#discussion_r990753031) 
sketches the details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-12-21 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1054099320


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+  msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+  new AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+  if(deleteCurrent) {
+// request to clean up shuffle we are currently hosting
+if (!mergePartitionsInfo.isFinalized()) {
+  submitCleanupTask(() -> {
+closeAndDeleteOutdatedPartitions(
+currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+  });
+} else {
+  submitCleanupTask(() -> {
+deleteMergedFiles(currentAppAttemptShuffleMergeId,
+mergePartitionsInfo.getReduceIds());
+writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+mergePartitionsInfo.setReduceIds(new int[0]);
+  });
+}
+  } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+"application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+// cleanup request for newer shuffle - remove the outdated data we 
have.
+submitCleanupTask(() -> {
+  closeAndDeleteOutdatedPartitions(
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+  writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   As mentioned above, `writeAppAttemptShuffleMergeInfoToDB` should be done 
within the critical section.
   We expect the this table to be consistent with `appShuffleInfo.shuffles`
   
   Let us actually move the update to db to the bottom - it is going to be 
required for all the code paths (since we error out if message is outdated).



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
   });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);

Review Comment:
   This call is happening in a threadpool and can be delayed - it is possible 
that appShuffleInfo was updated to a  different attempt id by this time.
   Pass this from the caller.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+if (appShuffleInfo.attemptId != msg.appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+}
+appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+  boolean deleteCurrent =
+  msg.shuffleMergeId == 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-11-20 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1027637860


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +394,35 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+String appId = msg.appId;
+int appAttemptId = msg.appAttemptId;
+int shuffleId = msg.shuffleId;
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+if (appShuffleInfo.attemptId != appAttemptId) {
+  throw new IllegalArgumentException(
+  String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+  + "with the current attempt id %s stored in shuffle service 
for application %s",
+  appAttemptId, appShuffleInfo.attemptId, appId));
+}
+
+appShuffleInfo.shuffles.compute(shuffleId, (shuffleIdKey, partitionsInfo) 
-> {
+  if (null != partitionsInfo) {

Review Comment:
   The validation related to `shuffleMergeId`, from `finalizeShuffleMerge`, is 
applicable here depending on `shuffleMergeId`.
   I have given the details 
[here](https://github.com/apache/spark/pull/37922#discussion_r990753031) - 
please refer to it.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java:
##
@@ -84,4 +85,9 @@ public MergedBlockMeta getMergedBlockMeta(
   public String[] getMergedBlockDirs(String appId) {
 throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
   }
+
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {
+throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");

Review Comment:
   nit: 
   ```suggestion
   throw new UnsupportedOperationException("Cannot handle merged shuffle 
remove");
   ```



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java:
##
@@ -121,6 +122,13 @@ MergedBlockMeta getMergedBlockMeta(
*/
   String[] getMergedBlockDirs(String appId);
 
+  /**
+   * Remove shuffle merge data files.
+   *
+   * @param removeShuffleMerge Remove shuffle merge RPC

Review Comment:
   ```suggestion
  * @param removeShuffleMerge contains shuffle details (appId, shuffleId, 
etc) to uniquely identify a shuffle to be removed
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990820843


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
   // empty MergeStatuses but cleanup the older shuffleMergeId files.
+  Map shuffleMergePartitions =
+  mergePartitionsInfo.shuffleMergePartitions;
   submitCleanupTask(() ->
-  closeAndDeleteOutdatedPartitions(
-  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+  closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
 } else {

Review Comment:
   See my note above on how to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   That would mean, when a new shuffle mergeid starts, the older files should 
get deleted.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete using `deleteMergedFiles`.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles` as detailed, else use 
existing logic ?
   Will require changes to existing code though - we can split that as a follow 
up work @wankunde, so that this PR does not balloon in size.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   That would mean, when a new shuffle mergeid starts, the older files should 
get deleted.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles` as detailed, else use 
existing logic ?
   Will require changes to existing code though - we can split that as a follow 
up work @wankunde, so that this PR does not balloon in size.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   That would mean, when a new shuffle mergeid starts, the older files should 
get deleted.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles`, else use existing logic 
?
   Will require changes to existing code though - we can split that as a follow 
up work @wankunde, so that this PR does not balloon in size.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles`, else use existing logic 
?
   Will require changes to existing code though - we can split that as a follow 
up work @wankunde, so that this PR does not balloon in size.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles`, else use existing logic 
?
   Will require changes to existing code though.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   
   If `isFinalized` - use logic of `deleteMergedFiles`, else use existing logic 
?
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view - we will 
need to modify existing codepaths which are relying on 
`closeAndDeleteOutdatedPartitions` to suitably delete.
   
   Thinking more, probably it can be within `closeAndDeleteOutdatedPartitions` 
itself.
   If `isFinalized` - use logic of `deleteMergedFiles`, else use existing logic 
?
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently when finalized - 
`closeAndDeleteOutdatedPartitions` is mainly trying to cleanup open fd's. It 
deletes files only if they were not finalized (and so no one can read it - they 
are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990756966


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -1410,26 +1431,27 @@ public String toString() {
* required for the shuffles of indeterminate stages.
*/
   public static class AppShuffleMergePartitionsInfo {

Review Comment:
   Revert changes to this class ? (assuming the changes I sketched above are 
fine)



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
   // empty MergeStatuses but cleanup the older shuffleMergeId files.
+  Map shuffleMergePartitions =
+  mergePartitionsInfo.shuffleMergePartitions;
   submitCleanupTask(() ->
-  closeAndDeleteOutdatedPartitions(
-  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+  closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
 } else {
   // This block covers:
   //  1. finalization of determinate stage
   //  2. finalization of indeterminate stage if the shuffleMergeId 
related to it is the one
   //  for which the message is received.
   
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
 }
+  } else {
+mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, 
true);
   }
+  mergePartitionsInfo.setFinalized(true);
   // Update the DB for the finalized shuffle
   writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
   // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
   // sent to the driver will be empty. This can happen when the service 
didn't receive any
   // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
   // shuffle.
-  return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+  return mergePartitionsInfo;

Review Comment:
   revert changes to this method ? (assuming the changes I sketched above are 
fine)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently - `closeAndDeleteOutdatedPartitions` 
is mainly trying to cleanup open fd's. It deletes files only if they were not 
finalized (and so no one can read it - they are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term in memory and this is available 
on disk anyway - `getReducerIds()` above should suffice to recreate the full 
list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently - `closeAndDeleteOutdatedPartitions` 
is mainly trying to cleanup open fd's. It deletes files only if they were not 
finalized (and so no one can read it - they are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files, by preserving existing entry as in 
this PR currently, does not work - it will be lost during restart of NM (not in 
DB), will be expensive to maintain in long term and this is available on disk 
anyway - `getReducerIds()` above should suffice to recreate the full list of 
files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files when finalized [1], as @wankunde has 
observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently - `closeAndDeleteOutdatedPartitions` 
is mainly trying to cleanup open fd's. It deletes files only if they were not 
finalized (and so no one can read it - they are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?
   
   [1] Trying to keep track of the files does not work - it will be lost during 
restart of NM (not in DB), and this is available on disk anyway - 
getReducerIds() above should suffice to recreate the full list of files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990763769


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   I added `deleteMergedFiles` above since `closeAndDeleteOutdatedPartitions` 
looses reference to existing files, as @wankunde has observed in his current PR.
   
   Why is this not an issue currently ?
   
   a) We are not deleting files currently - `closeAndDeleteOutdatedPartitions` 
is mainly trying to cleanup open fd's. It deletes files only if they were not 
finalized (and so no one can read it - they are partial).
   b) During application termination, we go and remove the parent directories 
and delete files recursively - so nothing is left once we are done.
   
   
   With this PR, we will need to change behavior to also delete older/existing 
files even if finalized.
   I have sketched an option above - from a deletion point of view.
   
   Thoughts @otterc, @zhouyejoe, @wankunde ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990753031


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   We should not remove it directly - the value within the map could be for a 
different `shuffleMergeId` (newer for example).
   Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
   
   Rough sketch is:
   
   ```
 public void removeShuffleMerge(FinalizeShuffleMerge msg) {
   AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
   if (appShuffleInfo.attemptId != msg.appAttemptId) {
 // incoming request for different app attempt - exception
 throw new IllegalArgumentException("appropriate msg for invalid app 
attempt");
   }
   
   appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
   
 if (null != mergePartitionsInfo) {
   // where DELETE_CURRENT == -1
   // merge id will be set to -1 when we are cleaning up shuffle, and 
there is no chance of its reuse -
   // else it will be set to an explicit value.
   boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
   
   // Looks like there is a bug in finalizeShuffleMerge, let us fix it 
here anyway
   // and handle it for finalizeShuffleMerge in a different PR.
   AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new 
AppAttemptShuffleMergeId(
   msg.appId, msg.appAttemptId, 
   msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
   if (!deleteAny && msg.shuffleMergeId < 
mergePartitionsInfo.shuffleMergeId) {
   
 // throw exception - request for an older shuffle merge id
 throw new RuntimeException("appropriate msg for delete of old 
merge id");
   } else if (!deleteAny && msg.shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId) {
   
 // cleanup request for newer shuffle - remove the outdated data we 
have.
 submitCleanupTask(() ->
 closeAndDeleteOutdatedPartitions(
 currentAppAttemptShuffleMergeId,
 mergePartitionsInfo.shuffleMergePartitions));
   } else {
   
 // request to cleanup shuffle we are currently hosting
   
 // Not yet finalized - use the existing cleanup mechanism
 if (!mergePartitionsInfo.isFinalized()) {
   submitCleanupTask(() ->
   closeAndDeleteOutdatedPartitions(
   currentAppAttemptShuffleMergeId,
   mergePartitionsInfo.shuffleMergePartitions));
 } else {
   
   if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
 // Introduce new method which deletes the files for 
shuffleMergeId
 submitCleanupTask(() ->
   deleteMergedFiles(msg.appId, msg.appAttemptId, 
 msg.shuffleId, msg.shuffleMergeId, 
 // To be introduced - see below
 mergePartitionsInfo.getReduceIds()));
   
   }
   // simply return existing entry immediately - db does not need 
updating - we can actually 
   // drop reduce-ids here as an optimization
   return mergePartitionsInfo;
 }
   }
 }
 // keep track of the latest merge id - and mark it as finalized and 
immutable as already marked for deletion/deleted.
 AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
   msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
 writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
 // no reduceid's
 return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
   });
 }
   ```
   
   To write up `deleteMergedFiles`, the only thing missing is set of valid 
reduce id's (`getReduceIds` above).
   We can keep track of that by modifying `finalizeShuffleMerge` as follows:
   
   a) keep reference to response from `appShuffleInfo.shuffles.compute()`
   b) Before returning `mergeStatuses`, update this variable with `reduceIds`
   c) for efficiency, we can convert it to a bitmap and save space - but that 
is an impl detail.
   
   Thoughts ?
   
   +CC @otterc, @zhouyejoe - please take a look at the bug there should be 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-10-09 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990751884


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java:
##
@@ -224,6 +224,12 @@ protected void handleMessage(
   } finally {
 responseDelayContext.stop();
   }
+} else if (msgObj instanceof RemoveShuffleMerge) {
+  RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+  checkAuth(client, msg.appId);
+  logger.info("Removing shuffle merge data for application {} shuffle {} 
shuffleMerge {}",
+  msg.appId, msg.shuffleId, msg.shuffleMergeId);
+  mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, 
msg.shuffleMergeId);

Review Comment:
   We need to pass in `appAttemptId` as well for `RemoveShuffleMerge` - I had 
left that comment earlier.
   Take a look at `FinalizeShuffleMerge`, its processing and handle it 
similarly ? (we can pass `RemoveShuffleMerge` to 
`mergeManager.removeShuffleMerge` - and lookup from the fields there).



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);

Review Comment:
   Add validation for `attemptId` here.
   Take a look at `finalizeShuffleMerge` for example



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   We should not remove it directly - the value within the map could be for a 
different `shuffleMergeId` (newer for example).
   Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
   
   Rough sketch is:
   
   ```
 public void removeShuffleMerge(FinalizeShuffleMerge msg) {
   AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
   if (appShuffleInfo.attemptId != msg.appAttemptId) {
 // incoming request for older app attempt - exception
 throw new IllegalArgumentException("appropriate msg for invalid app 
attempt");
   }
   
   appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
   
 if (null != mergePartitionsInfo) {
   // where DELETE_CURRENT == -1
   // merge id will be set to -1 when we are cleaning up shuffle, and 
there is no chance of its reuse -
   // else it will be set to an explicit value.
   boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
   
   // Looks like there is a bug in finalizeShuffleMerge, let us fix it 
here anyway
   // and handle it for finalizeShuffleMerge in a different PR.
   AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new 
AppAttemptShuffleMergeId(
   msg.appId, msg.appAttemptId, 
   msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
   if (!deleteAny && msg.shuffleMergeId < 
mergePartitionsInfo.shuffleMergeId) {
   
 // throw exception - request for an older shuffle merge id
 throw new RuntimeException("appropriate msg for delete of old 
merge id");
   } else if (!deleteAny && msg.shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId) {
   
 // cleanup request for newer shuffle - remove the outdated data we 
have.
 submitCleanupTask(() ->
 closeAndDeleteOutdatedPartitions(
 currentAppAttemptShuffleMergeId,
 mergePartitionsInfo.shuffleMergePartitions));
   } else {
   
 // request to cleanup shuffle we are currently hosting
   
 // Not yet finalized - use the existing cleanup mechanism
 if (!mergePartitionsInfo.isFinalized()) {
   submitCleanupTask(() ->
   closeAndDeleteOutdatedPartitions(
   currentAppAttemptShuffleMergeId,
   mergePartitionsInfo.shuffleMergePartitions));
 } else {
   
   if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
 // Introduce new method which deletes the files for 
shuffleMergeId
 submitCleanupTask(() ->
   deleteMergedFiles(msg.appId, msg.appAttemptId, 
 msg.shuffleId, 

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished

2022-09-18 Thread GitBox


mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r973756877


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java:
##
@@ -255,4 +255,17 @@ public void getMergedBlockMeta(
   MergedBlocksMetaListener listener) {
 throw new UnsupportedOperationException();
   }
+
+  /**
+   * Remove the shuffle merge data in shuffle services
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param shuffleId shuffle id.
+   *
+   * @since 3.4.0
+   */
+  public boolean removeShuffleMerge(String host, int port, int shuffleId) {

Review Comment:
   Pass the `shuffleMergeId` as well here (and everywhere else below as 
relevant).
   This will make sure the protocol/api extensible for future use if/when we 
want to cleanup for a specific merge id.
   
   Use a specific value to indicate cleanup for all/any shuffle merge id's (for 
example `-1`, since we start with `0` for shuffle merge id and bump up for new 
stage attempts for indeterminate stages).
   
   For now, we can fail request if `shuffleMergeId` is not `-1` (in 
`RemoteBlockPushResolver.java`) - and progressively add support for cleanup of 
specific merge id in future versions as we need : but this will ensure protocol 
changes are not required at that point in time.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;

Review Comment:
   Add `appAttemptId` as well here.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -393,6 +393,20 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId) {

Review Comment:
   When adding support for `shuffleMergeId`, follow the same pattern as 
`finalizeShuffleMerge` - there are a few corner cases here, and that method 
handles them.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+  public final String appId;
+  public final int shuffleId;
+
+  public RemoveShuffleMerge(String appId, int shuffleId) {
+this.appId = appId;
+