HeartSaVioR commented on a change in pull request #31495: URL: https://github.com/apache/spark/pull/31495#discussion_r571792973
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala ########## @@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} class OffsetSeqLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[OffsetSeq](sparkSession, path) { + private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]() + + override def add(batchId: Long, metadata: OffsetSeq): Boolean = { + val added = super.add(batchId, metadata) + if (added) { + // cache metadata as it will be read again + cachedMetadata.put(batchId, metadata) + // we don't access metadata for (batchId - 2) batches; evict them Review comment: https://gist.github.com/HeartSaVioR/111ed75aa2dc4672e36968c02db83e26 ``` import java.lang.{Long => JLong} import java.util.{ArrayList, Collections, TreeMap} def c(treeMap: TreeMap[Long, String]): Long = { val t1 = System.nanoTime() treeMap.put(1, "1") treeMap.put(2, "3") treeMap.put(3, "3") treeMap.headMap(2, true).clear() (System.nanoTime() - t1) } def d(treeMap: TreeMap[Long, String], idx: Long, value: String): Long = { val t1 = System.nanoTime() treeMap.put(idx, value) treeMap.headMap(idx - 2, true).clear() (System.nanoTime() - t1) } def experimentC(): Unit = { val latencies = new ArrayList[JLong]() val warmupCount = 1000000 val runCount = 10000000 (1 to warmupCount).foreach { _ => val t = new java.util.TreeMap[Long, String]() c(t) } (1 to runCount).foreach { _ => val t = new java.util.TreeMap[Long, String]() latencies.add(JLong.valueOf(c(t))) } java.util.Collections.sort(latencies) printLatencies(latencies) } def experimentD(): Unit = { val latencies = new ArrayList[JLong]() val warmupCount = 1000000 val runCount = 10000000 val t = new java.util.TreeMap[Long, String]() (1 to warmupCount).foreach { idx => d(t, idx, idx.toString) } val t2 = new java.util.TreeMap[Long, String]() (1 to runCount).foreach { idx => latencies.add(JLong.valueOf(d(t2, idx, idx.toString))) } printLatencies(latencies) } def printLatencies(latencies: ArrayList[JLong]): Unit = { val arraySize = latencies.size() val minIdx = 0 val maxIdx = arraySize - 1 val percentile50 = (arraySize * 0.5).toInt val percentile90 = (arraySize * 0.9).toInt val percentile99 = (arraySize * 0.99).toInt val percentile999 = (arraySize * 0.999).toInt val percentile9999 = (arraySize * 0.9999).toInt val percentile99999 = (arraySize * 0.99999).toInt val percentile999999 = (arraySize * 0.999999).toInt java.util.Collections.sort(latencies) Seq(minIdx, percentile50, percentile90, percentile99, percentile999, percentile9999, percentile99999, percentile999999, maxIdx).foreach { idx => printLatency(latencies, idx) } } def printLatency(latencies: ArrayList[JLong], idx: Int): Unit = { println(s"$idx th : ${latencies.get(idx) / 1000} microseconds = ${latencies.get(idx) / 1000000} milliseconds") } // experimentC() /* 0 th : 0 microseconds = 0 milliseconds 5000000 th : 0 microseconds = 0 milliseconds 9000000 th : 0 microseconds = 0 milliseconds 9900000 th : 0 microseconds = 0 milliseconds 9990000 th : 1 microseconds = 0 milliseconds 9999000 th : 9 microseconds = 0 milliseconds 9999900 th : 37 microseconds = 0 milliseconds 9999990 th : 223 microseconds = 0 milliseconds 9999999 th : 53612 microseconds = 53 milliseconds */ experimentD() /* 0 th : 0 microseconds = 0 milliseconds 5000000 th : 0 microseconds = 0 milliseconds 9000000 th : 0 microseconds = 0 milliseconds 9900000 th : 0 microseconds = 0 milliseconds 9990000 th : 0 microseconds = 0 milliseconds 9999000 th : 6 microseconds = 0 milliseconds 9999900 th : 25 microseconds = 0 milliseconds 9999990 th : 150 microseconds = 0 milliseconds 9999999 th : 57887 microseconds = 57 milliseconds */ ``` 2018 13-inch MBP, i7 quad-core 2.7Ghz ``` ./bin/spark-shell --driver-memory 2g ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.1 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) ``` Still think this really matters? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org