HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571792973



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
##########
@@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset 
=> OffsetV2}
 class OffsetSeqLog(sparkSession: SparkSession, path: String)
   extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
 
+  private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
+
+  override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
+    val added = super.add(batchId, metadata)
+    if (added) {
+      // cache metadata as it will be read again
+      cachedMetadata.put(batchId, metadata)
+      // we don't access metadata for (batchId - 2) batches; evict them

Review comment:
       https://gist.github.com/HeartSaVioR/111ed75aa2dc4672e36968c02db83e26
   
   ```
   import java.lang.{Long => JLong}
   import java.util.{ArrayList, Collections, TreeMap}
   
   def c(treeMap: TreeMap[Long, String]): Long = {
     val t1 = System.nanoTime()
     treeMap.put(1, "1")
     treeMap.put(2, "3")
     treeMap.put(3, "3")
     treeMap.headMap(2, true).clear()
     (System.nanoTime() - t1)
   }
   
   def d(treeMap: TreeMap[Long, String], idx: Long, value: String): Long = {
     val t1 = System.nanoTime()
     treeMap.put(idx, value)
     treeMap.headMap(idx - 2, true).clear()
     (System.nanoTime() - t1)
   }
   
   def experimentC(): Unit = {
     val latencies = new ArrayList[JLong]()
     val warmupCount = 1000000
     val runCount = 10000000
   
     (1 to warmupCount).foreach { _ =>
       val t = new java.util.TreeMap[Long, String]()
       c(t)
     }
   
     (1 to runCount).foreach { _ =>
       val t = new java.util.TreeMap[Long, String]()
       latencies.add(JLong.valueOf(c(t)))
     }
   
     java.util.Collections.sort(latencies)
   
     printLatencies(latencies)
   }
   
   def experimentD(): Unit = {
     val latencies = new ArrayList[JLong]()
     val warmupCount = 1000000
     val runCount = 10000000
   
     val t = new java.util.TreeMap[Long, String]()
     (1 to warmupCount).foreach { idx =>
       d(t, idx, idx.toString)
     }
   
     val t2 = new java.util.TreeMap[Long, String]()
     (1 to runCount).foreach { idx =>
       latencies.add(JLong.valueOf(d(t2, idx, idx.toString)))
     }
   
     printLatencies(latencies)
   }
   
   def printLatencies(latencies: ArrayList[JLong]): Unit = {
     val arraySize = latencies.size()
     val minIdx = 0
     val maxIdx = arraySize - 1
     val percentile50 = (arraySize * 0.5).toInt
     val percentile90 = (arraySize * 0.9).toInt
     val percentile99 = (arraySize * 0.99).toInt
     val percentile999 = (arraySize * 0.999).toInt
     val percentile9999 = (arraySize * 0.9999).toInt
     val percentile99999 = (arraySize * 0.99999).toInt
     val percentile999999 = (arraySize * 0.999999).toInt
   
     java.util.Collections.sort(latencies)
   
     Seq(minIdx, percentile50, percentile90, percentile99, percentile999, 
percentile9999, percentile99999, percentile999999, maxIdx).foreach { idx =>
       printLatency(latencies, idx)
     }  
   }
   
   def printLatency(latencies: ArrayList[JLong], idx: Int): Unit = {
     println(s"$idx th : ${latencies.get(idx) / 1000} microseconds = 
${latencies.get(idx) / 1000000} milliseconds")
   }
   
   // experimentC()
   
   /*
   0 th : 0 microseconds = 0 milliseconds
   5000000 th : 0 microseconds = 0 milliseconds
   9000000 th : 0 microseconds = 0 milliseconds
   9900000 th : 0 microseconds = 0 milliseconds
   9990000 th : 1 microseconds = 0 milliseconds
   9999000 th : 9 microseconds = 0 milliseconds
   9999900 th : 37 microseconds = 0 milliseconds
   9999990 th : 223 microseconds = 0 milliseconds
   9999999 th : 53612 microseconds = 53 milliseconds
   */
   
   experimentD()
   
   /*
   0 th : 0 microseconds = 0 milliseconds
   5000000 th : 0 microseconds = 0 milliseconds
   9000000 th : 0 microseconds = 0 milliseconds
   9900000 th : 0 microseconds = 0 milliseconds
   9990000 th : 0 microseconds = 0 milliseconds
   9999000 th : 6 microseconds = 0 milliseconds
   9999900 th : 25 microseconds = 0 milliseconds
   9999990 th : 150 microseconds = 0 milliseconds
   9999999 th : 57887 microseconds = 57 milliseconds
   */
   ```
   
   2018 13-inch MBP, i7 quad-core 2.7Ghz
   
   ```
   ./bin/spark-shell --driver-memory 2g
   ...
   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
         /_/
   
   Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 
1.8.0_191)
   ```
   
   Still think this really matters?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to