zecookiez commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1989942743
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,40 @@ trait StateStoreWriter
def operatorStateMetadataVersion: Int = 1
- override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
- "number of rows which are dropped by watermark"),
- "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
total state rows"),
- "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated state rows"),
- "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
update"),
- "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
removed state rows"),
- "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to remove"),
- "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
commit changes"),
- "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by
state"),
- "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
- "number of state store instances")
- ) ++ stateStoreCustomMetrics ++ pythonMetrics
+ override lazy val metrics = {
+ // Lazy initialize instance metrics, but do not include these with regular
metrics
+ instanceMetrics
+ statefulOperatorCustomMetrics ++ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "numRowsDroppedByWatermark" -> SQLMetrics
+ .createMetric(sparkContext, "number of rows which are dropped by
watermark"),
+ "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of
total state rows"),
+ "numUpdatedStateRows" -> SQLMetrics
+ .createMetric(sparkContext, "number of updated state rows"),
+ "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to update"),
+ "numRemovedStateRows" -> SQLMetrics
+ .createMetric(sparkContext, "number of removed state rows"),
+ "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time
to remove"),
+ "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to
commit changes"),
+ "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used
by state"),
+ "numStateStoreInstances" -> SQLMetrics
+ .createMetric(sparkContext, "number of state store instances")
+ ) ++ stateStoreCustomMetrics ++ pythonMetrics
+ }
+
+ lazy val instanceMetrics = stateStoreInstanceMetrics
Review Comment:
Good idea, added more context around what this is and why it's needed. I've
also simplified the logic behind the instance metric maps because we no longer
need to force the instance metric maps to use string indexing anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]