[ 
https://issues.apache.org/jira/browse/SPARK-33424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-33424:
-----------------------------
    Description: 
Although there are some similar discussions in SPARK-17562, but I still have 
some questions.

I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 
5 places in Spark Code,

Two of the call points are in the 
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
last  is in the "BypassMergeSortShuffleWriter#stop" method.

Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
example:

 
{code:java}
var success = false
try {
  while (inMemoryIterator.hasNext) {
    val kv = inMemoryIterator.next()
    writer.write(kv._1, kv._2)
    objectsWritten += 1

    if (objectsWritten == serializerBatchSize) {
      flush()
    }
  }
  if (objectsWritten > 0) {
    flush()
    writer.close()
  } else {
    writer.revertPartialWritesAndClose() // The first call point 
  }
  success = true
} finally {
  if (!success) {
    // This code path only happens if an exception was thrown above before we 
set success;
    // close our stuff and let the exception be thrown further
    writer.revertPartialWritesAndClose() // The second call point
    if (file.exists()) {
      if (!file.delete()) {
        logWarning(s"Error deleting ${file}")
      }
    }
  }
}

{code}
 

There are two questions about the above code:

1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
"writer.close()"?

I think there are two possibilities to get into this branch:
 * One possibility is all data has been flush(), I think we can call 
"writer.close()" directly because all data has been flushed, 
"committedPosition"  of DiskBlockObjectWriter should eq file.length.
 * Another is inMemoryIterator is empty, in this scenario whether calling 
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 0.

And if use "writer.close()"  instead of "writer.revertPartialWritesAndClose() " 
, all UTs will passed, so what is the specific scenario that must call the 
"revertPartialWritesAndClose() " method?

2. For the 2nd call point, the main goal is to roll back writeMetrics in 
DiskBlockObjectWriter? 
 If we want to delete this file,  Is the truncate operation in the 
"revertPartialWritesAndClose() " method really necessary?In this scenario, 
should we just roll back writeMetrics without truncate file to reduce one disk 
operation?
  

  was:
Although there are some similar discussions in SPARK-17562, but I still have 
some questions.

I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 
5 places in Spark Code,

Two of the call points are in the 
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
last  is in the "BypassMergeSortShuffleWriter#stop" method.

Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
example:

 
{code:java}
val (blockId, file) = diskBlockManager.createTempLocalBlock()
val writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, 
writeMetrics)
var objectsWritten = 0

// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]

// Flush the disk writer's contents to disk, and update relevant variables
def flush(): Unit = {
  val segment = writer.commitAndGet()
  batchSizes += segment.length
  _diskBytesSpilled += segment.length
  objectsWritten = 0
}

var success = false
try {
  while (inMemoryIterator.hasNext) {
    val kv = inMemoryIterator.next()
    writer.write(kv._1, kv._2)
    objectsWritten += 1

    if (objectsWritten == serializerBatchSize) {
      flush()
    }
  }
  if (objectsWritten > 0) {
    flush()
    writer.close()
  } else {
    writer.revertPartialWritesAndClose() // The first call point 
  }
  success = true
} finally {
  if (!success) {
    // This code path only happens if an exception was thrown above before we 
set success;
    // close our stuff and let the exception be thrown further
    writer.revertPartialWritesAndClose() // The second call point
    if (file.exists()) {
      if (!file.delete()) {
        logWarning(s"Error deleting ${file}")
      }
    }
  }
}

new DiskMapIterator(file, blockId, batchSizes)
{code}
 

There are two questions about the above code:

1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
"writer.close()"?

I think there are two possibilities to get into this branch:
 * One possibility is all data has been flush(), I think we can call 
"writer.close()" directly because all data has been flushed, 
"committedPosition"  of DiskBlockObjectWriter should eq file.length.
 * Another is inMemoryIterator is empty, in this scenario whether calling 
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 0.

And if use "writer.close()"  instead of "writer.revertPartialWritesAndClose() " 
, all UTs will passed, so what is the specific scenario that must call the 
"revertPartialWritesAndClose() " method?

2. For the 2nd call point, the main goal is to roll back writeMetrics in 
DiskBlockObjectWriter? 
 If we want to delete this file,  Is the truncate operation in the 
"revertPartialWritesAndClose() " method really necessary?In this scenario, 
should we just roll back writeMetrics without truncate file to reduce one disk 
operation?
  


> Doubts about the use of the 
> "DiskBlockObjectWriter#revertPartialWritesAndClose" method in Spark Code
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33424
>                 URL: https://issues.apache.org/jira/browse/SPARK-33424
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.1.0
>            Reporter: Yang Jie
>            Priority: Minor
>
> Although there are some similar discussions in SPARK-17562, but I still have 
> some questions.
> I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called 
> in 5 places in Spark Code,
> Two of the call points are in the 
> "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
> points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
> last  is in the "BypassMergeSortShuffleWriter#stop" method.
> Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
> example:
>  
> {code:java}
> var success = false
> try {
>   while (inMemoryIterator.hasNext) {
>     val kv = inMemoryIterator.next()
>     writer.write(kv._1, kv._2)
>     objectsWritten += 1
>     if (objectsWritten == serializerBatchSize) {
>       flush()
>     }
>   }
>   if (objectsWritten > 0) {
>     flush()
>     writer.close()
>   } else {
>     writer.revertPartialWritesAndClose() // The first call point 
>   }
>   success = true
> } finally {
>   if (!success) {
>     // This code path only happens if an exception was thrown above before we 
> set success;
>     // close our stuff and let the exception be thrown further
>     writer.revertPartialWritesAndClose() // The second call point
>     if (file.exists()) {
>       if (!file.delete()) {
>         logWarning(s"Error deleting ${file}")
>       }
>     }
>   }
> }
> {code}
>  
> There are two questions about the above code:
> 1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
> "writer.close()"?
> I think there are two possibilities to get into this branch:
>  * One possibility is all data has been flush(), I think we can call 
> "writer.close()" directly because all data has been flushed, 
> "committedPosition"  of DiskBlockObjectWriter should eq file.length.
>  * Another is inMemoryIterator is empty, in this scenario whether calling 
> "revertPartialWritesAndClose()" or calling "close()", the file.length is both 
> 0.
> And if use "writer.close()"  instead of "writer.revertPartialWritesAndClose() 
> " , all UTs will passed, so what is the specific scenario that must call the 
> "revertPartialWritesAndClose() " method?
> 2. For the 2nd call point, the main goal is to roll back writeMetrics in 
> DiskBlockObjectWriter? 
>  If we want to delete this file,  Is the truncate operation in the 
> "revertPartialWritesAndClose() " method really necessary?In this scenario, 
> should we just roll back writeMetrics without truncate file to reduce one 
> disk operation?
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to