xuanyuanking commented on a change in pull request #34502:
URL: https://github.com/apache/spark/pull/34502#discussion_r751969435



##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1956,8 +1956,21 @@ Here are the configs regarding to RocksDB instance of 
the state store provider:
     <td>Whether we resets all ticker and histogram stats for RocksDB on 
load.</td>
     <td>True</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows</td>
+    <td>Whether we track the total number of rows in state store. Please refer 
the details in <a href="#performance-aspect-considerations">Performance-aspect 
considerations</a>.</td>
+    <td>True</td>
+  </tr>
 </table>
 
+##### Performance-aspect considerations
+
+1. For write-heavy workloads, you may want to disable the track of total 
number of rows.

Review comment:
       Do we have others considerations here? Or we'll add more in the future? 
(Just want to double confirm the `1.` is not a typo.)

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -144,26 +156,28 @@ class RocksDB(
    * Put the given value for the given key and return the last written value.
    * @note This update is not committed to disk until commit() is called.
    */
-  def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
-    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
-    writeBatch.put(key, value)
-    if (oldValue == null) {
-      numKeysOnWritingVersion += 1
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    if (conf.trackTotalNumberOfRows) {
+      val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+      if (oldValue == null) {
+        numKeysOnWritingVersion += 1
+      }
     }
-    oldValue
+    writeBatch.put(key, value)
   }
 
   /**
    * Remove the key if present, and return the previous value if it was 
present (null otherwise).

Review comment:
       Same here

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -144,26 +156,28 @@ class RocksDB(
    * Put the given value for the given key and return the last written value.

Review comment:
       Please also change the comment correspondingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to