zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1978294921
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -155,16 +156,296 @@ class StateStoreCoordinatorSuite extends SparkFunSuite
with SharedSparkContext {
StateStore.stop()
}
}
+
+ test(
+ "SPARK-51358: Snapshot uploads in RocksDB are not reported if changelog " +
+ "checkpointing is disabled"
+ ) {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "false"
+ ) {
+ case (coordRef, spark) =>
+ import spark.implicits._
+ implicit val sqlContext = spark.sqlContext
+
+ // Start a query and run some data to force snapshot uploads
+ val inputData = MemoryStream[Int]
+ val aggregated = inputData.toDF().dropDuplicates()
+ val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+ val query = aggregated.writeStream
+ .format("memory")
+ .outputMode("update")
+ .queryName("query")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .start()
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ val stateCheckpointDir =
+
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+ // Verify stores do not report snapshot upload events to the
coordinator.
+ // As a result, all stores will return nothing as the latest version
+ (0 until
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId
=>
+ val providerId =
+ StateStoreProviderId(StateStoreId(stateCheckpointDir, 0,
partitionId), query.runId)
+ assert(coordRef.getLatestSnapshotVersion(providerId).isEmpty)
+ }
+ query.stop()
+ }
+ }
+
+ test("SPARK-51358: Snapshot uploads in RocksDB are properly reported to the
coordinator") {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key ->
"2"
+ ) {
+ case (coordRef, spark) =>
+ import spark.implicits._
+ implicit val sqlContext = spark.sqlContext
+
+ // Start a query and run some data to force snapshot uploads
+ val inputData = MemoryStream[Int]
+ val aggregated = inputData.toDF().dropDuplicates()
+ val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+ val query = aggregated.writeStream
+ .format("memory")
+ .outputMode("update")
+ .queryName("query")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .start()
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ val stateCheckpointDir =
+
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+ // Verify all stores have uploaded a snapshot and it's logged by the
coordinator
+ (0 until
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId
=>
+ val providerId =
+ StateStoreProviderId(StateStoreId(stateCheckpointDir, 0,
partitionId), query.runId)
+ assert(coordRef.getLatestSnapshotVersion(providerId).get >= 0)
+ }
+ // Verify that we should not have any state stores lagging behind
+ assert(coordRef.getLaggingStores().isEmpty)
+ query.stop()
+ }
+ }
+
+ test(
+ "SPARK-51358: Snapshot uploads in
RocksDBSkipMaintenanceOnCertainPartitionsProvider " +
+ "are properly reported to the coordinator"
+ ) {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key ->
"2"
+ ) {
+ case (coordRef, spark) =>
+ import spark.implicits._
+ implicit val sqlContext = spark.sqlContext
+
+ // Start a query and run some data to force snapshot uploads
+ val inputData = MemoryStream[Int]
+ val aggregated = inputData.toDF().dropDuplicates()
+ val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+ val query = aggregated.writeStream
+ .format("memory")
+ .outputMode("update")
+ .queryName("query")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .start()
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ inputData.addData(1, 2, 3)
Review Comment:
It's more that I need to use multiple `query.processAllAvailable()` to
commit and progress to a new version, but I also didn't want to provide 0 data.
I'll add a comment to make this more clear :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]