Hi, I've been reviewing how StateStoreSaveExec works per output mode focusing on Complete output mode [1] at the moment.
My understanding is that while in Complete output mode StateStoreSaveExec uses the metrics as follows: * numRowsTotal is the number of all the state keys in the state store * numRowsUpdated is the number of the state keys that were updated in the state store (i.e. the keys were available earlier and appeared in the result rows of the upstream physical operator) With that I wrote the following query (that's described in more details in Spark Structured Streaming gitbook [2]): val valuesPerGroup = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. withColumn("tokens", split('value, ",")). withColumn("group", 'tokens(0)). withColumn("value", 'tokens(1) cast "int"). select("group", "value"). groupBy($"group"). agg(collect_list("value") as "values"). orderBy($"group".asc) import scala.concurrent.duration._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} val sq = valuesPerGroup. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Complete). start ------------------------------------------- Batch: 0 ------------------------------------------- +-----+------+ |group|values| +-----+------+ +-----+------+ // there's only 1 stateful operator and hence 0 for the index in stateOperators scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 0, "numRowsUpdated" : 0, "memoryUsedBytes" : 60 } // publish 1 new keys and values in a single streaming batch // 0,1 ------------------------------------------- Batch: 1 ------------------------------------------- +-----+------+ |group|values| +-----+------+ |0 |[1] | +-----+------+ // it's Complete output mode so numRowsTotal is the number of keys in the state store // no keys were available earlier (it's just started!) and so numRowsUpdated is 0 scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 1, "numRowsUpdated" : 0, "memoryUsedBytes" : 324 } // publish new key and old key in a single streaming batch // new keys // 1,1 // updates to already-stored keys // 0,2 ------------------------------------------- Batch: 2 ------------------------------------------- +-----+------+ |group|values| +-----+------+ |0 |[2, 1]| |1 |[1] | +-----+------+ // it's Complete output mode so numRowsTotal is the number of keys in the state store // no keys were available earlier and so numRowsUpdated is...0?! // Think it's a BUG as it should've been 1 (for the row 0,2) scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 2, "numRowsUpdated" : 0, "memoryUsedBytes" : 572 } As you can see numRowsUpdated is 0 while I think should have been 1 (for the row 0,2) as the key 0 was already in the state store. There's this code [3] that resets the newNumRowsUpdated metric when no new data was available that looks fishy. Is my understanding correct and numRowsUpdated should be 1 in Complete output mode? Where's the issue (in the code or in my understanding)? I'd appreciate any help. Thanks! [1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L249 [2] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-StateStoreSaveExec.html [3] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L192 Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org