[ 
https://issues.apache.org/jira/browse/SPARK-33424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-33424:
-----------------------------
    Description: 
Although there are some similar discussions in SPARK-17562, but I still have 
some questions.

I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 
5 places in Spark Code,

Two of the call points are in the 
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
last  is in the "BypassMergeSortShuffleWriter#stop" method.

Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
example:

 
{code:java}
var success = false
try {
  while (inMemoryIterator.hasNext) {
    ...

    if (objectsWritten == serializerBatchSize) {
      flush()
    }
  }
  if (objectsWritten > 0) {
    flush()
    writer.close()
  } else {
    writer.revertPartialWritesAndClose() // The first call point 
  }
  success = true
} finally {
  if (!success) {
    writer.revertPartialWritesAndClose() // The second call point
    if (file.exists()) {
      if (!file.delete()) {
        logWarning(s"Error deleting ${file}")
      }
    }
  }
}

{code}
 

There are two questions about the above code:

1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
"writer.close()"?

I think there are two possibilities to get into this branch:
 * One possibility is all data has been called flush(), I think we can call 
"writer.close()" directly because all data has been flushed, 
"committedPosition"  of DiskBlockObjectWriter should eq file.length.
 * Another is inMemoryIterator is empty, in this scenario whether calling 
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 
0, the test suite "commit() and close() without ever opening or writing" in 
DiskBlockObjectWriterSuite can prove that

And I try to use "writer.close()"  instead of 
"writer.revertPartialWritesAndClose() " , all UTs in core module passed, so 
what is the specific scenario that must call the "revertPartialWritesAndClose() 
" method?

2. For the 2nd call point, the main goal is to roll back writeMetrics in 
DiskBlockObjectWriter? 
 If we want to delete this file,  Is the truncate operation in the 
"revertPartialWritesAndClose() " method really necessary?In this scenario, 
should we just roll back writeMetrics without truncate file to reduce one disk 
operation?
  

  was:
Although there are some similar discussions in SPARK-17562, but I still have 
some questions.

I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 
5 places in Spark Code,

Two of the call points are in the 
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
last  is in the "BypassMergeSortShuffleWriter#stop" method.

Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
example:

 
{code:java}
var success = false
try {
  while (inMemoryIterator.hasNext) {
    ...

    if (objectsWritten == serializerBatchSize) {
      flush()
    }
  }
  if (objectsWritten > 0) {
    flush()
    writer.close()
  } else {
    writer.revertPartialWritesAndClose() // The first call point 
  }
  success = true
} finally {
  if (!success) {
    writer.revertPartialWritesAndClose() // The second call point
    if (file.exists()) {
      if (!file.delete()) {
        logWarning(s"Error deleting ${file}")
      }
    }
  }
}

{code}
 

There are two questions about the above code:

1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
"writer.close()"?

I think there are two possibilities to get into this branch:
 * One possibility is all data has been called flush(), I think we can call 
"writer.close()" directly because all data has been flushed, 
"committedPosition"  of DiskBlockObjectWriter should eq file.length.
 * Another is inMemoryIterator is empty, in this scenario whether calling 
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 
0, the test suite "commit() and close() without ever opening or writing" in 
DiskBlockObjectWriterSuite can prove that

And if use "writer.close()"  instead of "writer.revertPartialWritesAndClose() " 
, all UTs will passed, so what is the specific scenario that must call the 
"revertPartialWritesAndClose() " method?

2. For the 2nd call point, the main goal is to roll back writeMetrics in 
DiskBlockObjectWriter? 
 If we want to delete this file,  Is the truncate operation in the 
"revertPartialWritesAndClose() " method really necessary?In this scenario, 
should we just roll back writeMetrics without truncate file to reduce one disk 
operation?
  


> Doubts about the use of the 
> "DiskBlockObjectWriter#revertPartialWritesAndClose" method in Spark Code
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33424
>                 URL: https://issues.apache.org/jira/browse/SPARK-33424
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.1.0
>            Reporter: Yang Jie
>            Priority: Minor
>
> Although there are some similar discussions in SPARK-17562, but I still have 
> some questions.
> I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called 
> in 5 places in Spark Code,
> Two of the call points are in the 
> "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call 
> points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
> last  is in the "BypassMergeSortShuffleWriter#stop" method.
> Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
> example:
>  
> {code:java}
> var success = false
> try {
>   while (inMemoryIterator.hasNext) {
>     ...
>     if (objectsWritten == serializerBatchSize) {
>       flush()
>     }
>   }
>   if (objectsWritten > 0) {
>     flush()
>     writer.close()
>   } else {
>     writer.revertPartialWritesAndClose() // The first call point 
>   }
>   success = true
> } finally {
>   if (!success) {
>     writer.revertPartialWritesAndClose() // The second call point
>     if (file.exists()) {
>       if (!file.delete()) {
>         logWarning(s"Error deleting ${file}")
>       }
>     }
>   }
> }
> {code}
>  
> There are two questions about the above code:
> 1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
> "writer.close()"?
> I think there are two possibilities to get into this branch:
>  * One possibility is all data has been called flush(), I think we can call 
> "writer.close()" directly because all data has been flushed, 
> "committedPosition"  of DiskBlockObjectWriter should eq file.length.
>  * Another is inMemoryIterator is empty, in this scenario whether calling 
> "revertPartialWritesAndClose()" or calling "close()", the file.length is both 
> 0, the test suite "commit() and close() without ever opening or writing" in 
> DiskBlockObjectWriterSuite can prove that
> And I try to use "writer.close()"  instead of 
> "writer.revertPartialWritesAndClose() " , all UTs in core module passed, so 
> what is the specific scenario that must call the 
> "revertPartialWritesAndClose() " method?
> 2. For the 2nd call point, the main goal is to roll back writeMetrics in 
> DiskBlockObjectWriter? 
>  If we want to delete this file,  Is the truncate operation in the 
> "revertPartialWritesAndClose() " method really necessary?In this scenario, 
> should we just roll back writeMetrics without truncate file to reduce one 
> disk operation?
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to