Hi,

I've been reviewing how StateStoreSaveExec works per output mode
focusing on Complete output mode [1] at the moment.

My understanding is that while in Complete output mode
StateStoreSaveExec uses the metrics as follows:

* numRowsTotal is the number of all the state keys in the state store
* numRowsUpdated is the number of the state keys that were updated in
the state store (i.e. the keys were available earlier and appeared in
the result rows of the upstream physical operator)

With that I wrote the following query (that's described in more
details in Spark Structured Streaming gitbook [2]):

val valuesPerGroup = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  withColumn("tokens", split('value, ",")).
  withColumn("group", 'tokens(0)).
  withColumn("value", 'tokens(1) cast "int").
  select("group", "value").
  groupBy($"group").
  agg(collect_list("value") as "values").
  orderBy($"group".asc)
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = valuesPerGroup.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Complete).
  start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+------+
|group|values|
+-----+------+
+-----+------+

// there's only 1 stateful operator and hence 0 for the index in stateOperators
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 0,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 60
}

// publish 1 new keys and values in a single streaming batch
// 0,1

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+------+
|group|values|
+-----+------+
|0    |[1]   |
+-----+------+

// it's Complete output mode so numRowsTotal is the number of keys in
the state store
// no keys were available earlier (it's just started!) and so
numRowsUpdated is 0
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 1,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 324
}

// publish new key and old key in a single streaming batch
// new keys
// 1,1
// updates to already-stored keys
// 0,2

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+------+
|group|values|
+-----+------+
|0    |[2, 1]|
|1    |[1]   |
+-----+------+

// it's Complete output mode so numRowsTotal is the number of keys in
the state store
// no keys were available earlier and so numRowsUpdated is...0?!
// Think it's a BUG as it should've been 1 (for the row 0,2)
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 2,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 572
}

As you can see numRowsUpdated is 0 while I think should have been 1
(for the row 0,2) as the key 0 was already in the state store.

There's this code [3] that resets the newNumRowsUpdated metric when no
new data was available that looks fishy.

Is my understanding correct and numRowsUpdated should be 1 in Complete
output mode? Where's the issue (in the code or in my understanding)?
I'd appreciate any help. Thanks!

[1] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L249

[2] 
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-StateStoreSaveExec.html

[3] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L192

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to