[ 
https://issues.apache.org/jira/browse/SPARK-53491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-53491.
----------------------------------
    Fix Version/s: 4.1.0
                       (was: 4.0.0)
       Resolution: Fixed

Issue resolved by pull request 52237
[https://github.com/apache/spark/pull/52237]

> SS | `inputRowsPerSecond` and `processedRowsPerSecond` in progress report 
> show values in exponential notation for large numbers
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-53491
>                 URL: https://issues.apache.org/jira/browse/SPARK-53491
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0, 3.5.2, 4.0.0
>            Reporter: Jayant Sharma
>            Assignee: Jayant Sharma
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.1.0
>
>
> In Structured Streaming, the progress metrics *inputRowsPerSecond* and 
> *processedRowsPerSecond* are currently stored as *Double* and passed as 
> Double in the JSON representation. For large values, they can be reported as 
> exponential/scientific notation (e.g., {*}6.923076923076923E7{*}) or double 
> values with very large scale (e.g., {*}638297.8723404256{*})
> This formatting is not user-friendly. A user can easily interpret 
> *6.923076923076923E7 as 6.92* instead of {*}69,230,769.23{*}, as *E* can be 
> missed to be spotted.
> Executing this code can reproduce the issue:
> {code:java}
> %scala
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import spark.implicits._ 
> implicit val sqlContext = spark.sqlContext
> val inputData = MemoryStream[Int]
> val df = inputData.toDF()
> val query = 
> df.writeStream.format("memory").queryName("TestFormatting").start() 
> val bigBatch = (1 to 900000).toSeq
> inputData.addData(bigBatch: _*)
> query.processAllAvailable()
> val progress = query.lastProgress 
> print(progress)
> query.stop(){code}
>  
> Progress metrics: 
> {code:java}
> {
>   "id" : "9b512179-ea36-4b98-9d79-049d13813894",
>   "runId" : "f85e2894-9582-493d-9b94-ce03e5490241",
>   "name" : "TestFormatting",
>   "timestamp" : "2025-09-04T10:57:02.897Z",
>   "batchId" : 0,
>   "batchDuration" : 1410,
>   "numInputRows" : 900000,
>   "inputRowsPerSecond" : 6.923076923076923E7,
>   "processedRowsPerSecond" : 638297.8723404256,
>   "durationMs" : {
>     "addBatch" : 1101,
>     "commitOffsets" : 157,
>     "getBatch" : 0,
>     "latestOffset" : 0,
>     "queryPlanning" : 3,
>     "triggerExecution" : 1410,
>     "walCommit" : 149
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "MemoryStream[value#133]",
>     "startOffset" : null,
>     "endOffset" : 0,
>     "latestOffset" : null,
>     "numInputRows" : 900000,
>     "inputRowsPerSecond" : 6.923076923076923E7,
>     "processedRowsPerSecond" : 638297.8723404256
>   } ],
>   "sink" : {
>     "description" : "MemorySink",
>     "numOutputRows" : 900000
>   }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to