[ 
https://issues.apache.org/jira/browse/SPARK-27852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuaiqi Ge updated SPARK-27852:
-------------------------------
    Description: 
 In *_DiskBlockObjectWriter.scala_*, there are 2 overload *_write_* functions, 
the first of which executes _*updateBytesWritten*_ function while the other 
doesn't. I think writeMetrics should record all the information about writing 
operations, some data of which will be displayed in the Spark jobs UI such as 
the data size of shuffle read and shuffle write.
{code:java}
def write(key: Any, value: Any) {
   if (!streamOpen) {
     open()
   }

   objOut.writeKey(key)
   objOut.writeValue(value)
   recordWritten()
}

override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
   if (!streamOpen) {
      open()
   }
   bs.write(kvBytes, offs, len)
}

**
* Notify the writer that a record worth of bytes has been written with 
OutputStream#write.
*/
def recordWritten(): Unit = {
   numRecordsWritten += 1
   writeMetrics.incRecordsWritten(1)

   if (numRecordsWritten % 16384 == 0) {
     updateBytesWritten()
   }
}

/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
   val pos = channel.position()
   writeMetrics.incBytesWritten(pos - reportedPosition)
   reportedPosition = pos
}
{code}
 

 

  was:
 In *_DiskBlockObjectWriter.scala_*, there are 2 overload *_write_* functions, 
the first of which executes _*updateBytesWritten*_ function while the other 
doesn't. I think writeMetrics should record all the information about writing 
operation, some data of which will displayed in the Spark jobs UI such as the 
data size of shuffle read and shuffle write.
{code:java}
def write(key: Any, value: Any) {
   if (!streamOpen) {
     open()
   }

   objOut.writeKey(key)
   objOut.writeValue(value)
   recordWritten()
}

override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
   if (!streamOpen) {
      open()
   }
   bs.write(kvBytes, offs, len)
}

**
* Notify the writer that a record worth of bytes has been written with 
OutputStream#write.
*/
def recordWritten(): Unit = {
   numRecordsWritten += 1
   writeMetrics.incRecordsWritten(1)

   if (numRecordsWritten % 16384 == 0) {
     updateBytesWritten()
   }
}

/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
   val pos = channel.position()
   writeMetrics.incBytesWritten(pos - reportedPosition)
   reportedPosition = pos
}
{code}
 

 


> One updateBytesWritten operaton may be missed in DiskBlockObjectWriter.scala
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-27852
>                 URL: https://issues.apache.org/jira/browse/SPARK-27852
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 2.4.3
>            Reporter: Shuaiqi Ge
>            Priority: Major
>
>  In *_DiskBlockObjectWriter.scala_*, there are 2 overload *_write_* 
> functions, the first of which executes _*updateBytesWritten*_ function while 
> the other doesn't. I think writeMetrics should record all the information 
> about writing operations, some data of which will be displayed in the Spark 
> jobs UI such as the data size of shuffle read and shuffle write.
> {code:java}
> def write(key: Any, value: Any) {
>    if (!streamOpen) {
>      open()
>    }
>    objOut.writeKey(key)
>    objOut.writeValue(value)
>    recordWritten()
> }
> override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
>    if (!streamOpen) {
>       open()
>    }
>    bs.write(kvBytes, offs, len)
> }
> **
> * Notify the writer that a record worth of bytes has been written with 
> OutputStream#write.
> */
> def recordWritten(): Unit = {
>    numRecordsWritten += 1
>    writeMetrics.incRecordsWritten(1)
>    if (numRecordsWritten % 16384 == 0) {
>      updateBytesWritten()
>    }
> }
> /**
> * Report the number of bytes written in this writer's shuffle write metrics.
> * Note that this is only valid before the underlying streams are closed.
> */
> private def updateBytesWritten() {
>    val pos = channel.position()
>    writeMetrics.incBytesWritten(pos - reportedPosition)
>    reportedPosition = pos
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to