zecookiez commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1989942743


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,40 @@ trait StateStoreWriter
 
   def operatorStateMetadataVersion: Int = 1
 
-  override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
-      "number of rows which are dropped by watermark"),
-    "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
-    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
updated state rows"),
-    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
update"),
-    "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
removed state rows"),
-    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
-    "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
-    "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by 
state"),
-    "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
-      "number of state store instances")
-  ) ++ stateStoreCustomMetrics ++ pythonMetrics
+  override lazy val metrics = {
+    // Lazy initialize instance metrics, but do not include these with regular 
metrics
+    instanceMetrics
+    statefulOperatorCustomMetrics ++ Map(
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numRowsDroppedByWatermark" -> SQLMetrics
+        .createMetric(sparkContext, "number of rows which are dropped by 
watermark"),
+      "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
+      "numUpdatedStateRows" -> SQLMetrics
+        .createMetric(sparkContext, "number of updated state rows"),
+      "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to update"),
+      "numRemovedStateRows" -> SQLMetrics
+        .createMetric(sparkContext, "number of removed state rows"),
+      "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
+      "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
+      "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used 
by state"),
+      "numStateStoreInstances" -> SQLMetrics
+        .createMetric(sparkContext, "number of state store instances")
+    ) ++ stateStoreCustomMetrics ++ pythonMetrics
+  }
+
+  lazy val instanceMetrics = stateStoreInstanceMetrics

Review Comment:
   Good idea, added more context around what this is and why it's needed. I've 
also simplified the logic behind the instance metric maps because we no longer 
need to force the instance metric maps to use string indexing anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to