HeartSaVioR commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1994940008
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1465,6 +1470,7 @@ class RocksDB(
log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
+ lastUploadedSnapshotVersion.set(snapshot.version)
Review Comment:
I guess there shouldn't be concurrent maintenance tasks to run for the same
state store provider ID.
That said, we shouldn't allow multiple state stores to create multiple
RocksDB instances.
But of course, it'd be better to do so to simply ensure that we never go
back.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2233,6 +2233,19 @@ object SQLConf {
.intConf
.createWithDefault(10)
+ val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+ .internal()
+ .doc(
+ "Number of state store instance metrics included in streaming query
progress messages " +
+ "per stateful operator. Instance metrics are selected based on
metric-specific ordering " +
+ "to minimize noise in the progress report."
+ )
+ .version("4.0.0")
Review Comment:
Let's punt this out to 4.1.0.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,46 @@ trait StateStoreWriter
def operatorStateMetadataVersion: Int = 1
- override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
- "number of rows which are dropped by watermark"),
- "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
total state rows"),
- "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated state rows"),
- "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
update"),
- "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
removed state rows"),
- "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to remove"),
- "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
commit changes"),
- "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by
state"),
- "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
- "number of state store instances")
- ) ++ stateStoreCustomMetrics ++ pythonMetrics
+ override lazy val metrics = {
+ // Lazy initialize instance metrics, but do not include these with regular
metrics
Review Comment:
So this is mostly the only diff from the reverted commit, do I understand
correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]