[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-04 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1014297681


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 for (batchId <- batchIds if batchId > thresholdBatchId) {
   val path = batchIdToPath(batchId)
   fileManager.delete(path)
+  if (metadataCacheEnabled) batchCache.remove(batchId)
   logTrace(s"Removed metadata log file: $path")
 }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 
inconsistent list, it also

Review Comment:
   I think this comment is out of date.  Amazon now delivers strong read after 
write consistency.  I will remove in a subsequent PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-02 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011241156


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap}

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-02 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011239497


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
 fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = 
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses 
the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new 
LinkedHashMap[Long, T](2) {

Review Comment:
   We could but the change is not really going to yield much difference.  The 
memory foot print of this is minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-02 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011238096


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 for (batchId <- batchIds if batchId > thresholdBatchId) {
   val path = batchIdToPath(batchId)
   fileManager.delete(path)
+  if (metadataCacheEnabled) batchCache.remove(batchId)
   logTrace(s"Removed metadata log file: $path")
 }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 
inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  .map(f => pathToBatchId(f.getPath)) ++
+  // Iterate over keySet is not thread safe. We call `toArray` to make a 
copy in the lock to
+  // elimiate the race condition.
+  batchCache.synchronized {
+batchCache.keySet.asScala.toArray
+  }
+logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+
+if (batchIds.isEmpty) {
+  return Array.empty

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-01 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010692304


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 for (batchId <- batchIds if batchId > thresholdBatchId) {
   val path = batchIdToPath(batchId)
   fileManager.delete(path)
+  if (metadataCacheEnabled) batchCache.remove(batchId)
   logTrace(s"Removed metadata log file: $path")
 }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 
inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  .map(f => pathToBatchId(f.getPath)) ++
+  // Iterate over keySet is not thread safe. We call `toArray` to make a 
copy in the lock to
+  // elimiate the race condition.
+  batchCache.synchronized {
+batchCache.keySet.asScala.toArray

Review Comment:
   We need to return a scala array not a Java array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org