ericm-db commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1989829264
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1465,6 +1470,7 @@ class RocksDB(
log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
+ lastUploadedSnapshotVersion.set(snapshot.version)
Review Comment:
Hm, do we need to do something like
`lastUploadedSnapshotVersion.set(max(lastUploadedSnapshot, snapshot.version))`?
Is there a case where this might be necessary?
cc @HeartSaVioR
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,40 @@ trait StateStoreWriter
def operatorStateMetadataVersion: Int = 1
- override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
- "number of rows which are dropped by watermark"),
- "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
total state rows"),
- "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated state rows"),
- "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
update"),
- "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
removed state rows"),
- "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to remove"),
- "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
commit changes"),
- "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by
state"),
- "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
- "number of state store instances")
- ) ++ stateStoreCustomMetrics ++ pythonMetrics
+ override lazy val metrics = {
+ // Lazy initialize instance metrics, but do not include these with regular
metrics
+ instanceMetrics
+ statefulOperatorCustomMetrics ++ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "numRowsDroppedByWatermark" -> SQLMetrics
+ .createMetric(sparkContext, "number of rows which are dropped by
watermark"),
+ "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
total state rows"),
+ "numUpdatedStateRows" -> SQLMetrics
+ .createMetric(sparkContext, "number of updated state rows"),
+ "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to update"),
+ "numRemovedStateRows" -> SQLMetrics
+ .createMetric(sparkContext, "number of removed state rows"),
+ "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to remove"),
+ "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
commit changes"),
+ "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used
by state"),
+ "numStateStoreInstances" -> SQLMetrics
+ .createMetric(sparkContext, "number of state store instances")
+ ) ++ stateStoreCustomMetrics ++ pythonMetrics
+ }
+
+ lazy val instanceMetrics = stateStoreInstanceMetrics
Review Comment:
Can you leave a comment about what this is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]