[ https://issues.apache.org/jira/browse/SPARK-45759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ali Ince updated SPARK-45759: ----------------------------- Description: We have a DataWriter component, which processes records in configurable batches, which are accumulated in {{write(T record)}} implementation and sent to the persistent store when the configured batch size is reached. Within this approach, last batch is handled during {{commit()}} call, as there is no other mechanism of knowing if there are more records or not. We are now adding support for custom metrics, by implementing the {{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} and {{DataWriter}} implementations. The problem we see is, since {{CustomMetrics.updateMetrics}} is only called [during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443] and [just after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451] record processing, we do not observe the complete metrics since the last batch that is handled during {{commit()}} call is not collected/updated. We propose to also to add {{CustomMetrics.updateMetrics}} call after {{commit()}} is processed successfully, ideally just before {{run}} function exits (maybe just above [https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]). was: We have a DataWriter component, which processes records in configurable batches, which are accumulated in {{write(T record)}} implementation and sent to the persistent store when the configured batch size is reached. Within this approach, last batch is handled during {{commit()}} call, as there is no other mechanism of knowing if there are more records or not. We are now adding support for custom metrics, by implementing the {{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} and {{DataWriter}} implementations. The problem we see is, since {{CustomMetrics.updateMetrics}} is only called [during|#L443-L443] and [just after|#L451-L451] record processing, we do not observe the complete metrics since the last batch that is handled during {{commit()}} call is not collected/updated. We propose to also to add {{CustomMetrics.updateMetrics}} call after {{commit()}} is processed successfully, ideally just before {{run}} function exits (maybe just above [https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]). > Custom metrics should be updated after commit too > ------------------------------------------------- > > Key: SPARK-45759 > URL: https://issues.apache.org/jira/browse/SPARK-45759 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.1 > Reporter: Ali Ince > Priority: Minor > > We have a DataWriter component, which processes records in configurable > batches, which are accumulated in {{write(T record)}} implementation and sent > to the persistent store when the configured batch size is reached. Within > this approach, last batch is handled during {{commit()}} call, as there is no > other mechanism of knowing if there are more records or not. > We are now adding support for custom metrics, by implementing the > {{supportedCustomMetrics()}} and {{currentMetricsValues()}} in the {{Write}} > and {{DataWriter}} implementations. The problem we see is, since > {{CustomMetrics.updateMetrics}} is only called > [during|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L443-L443] > and [just > after|https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L451-L451] > record processing, we do not observe the complete metrics since the last > batch that is handled during {{commit()}} call is not collected/updated. > We propose to also to add {{CustomMetrics.updateMetrics}} call after > {{commit()}} is processed successfully, ideally just before {{run}} function > exits (maybe just above > [https://github.com/apache/spark/blob/af8907a0873f5ca192b150f28a0c112107594722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L473-L473]). -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org