[ https://issues.apache.org/jira/browse/SPARK-33424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Jie updated SPARK-33424: ----------------------------- Description: Although there are some similar discussions in SPARK-17562, but I still have some questions. I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 5 places in Spark Code, Two of the call points are in the "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the last is in the "BypassMergeSortShuffleWriter#stop" method. Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an example: {code:java} var success = false try { while (inMemoryIterator.hasNext) { val kv = inMemoryIterator.next() writer.write(kv._1, kv._2) objectsWritten += 1 if (objectsWritten == serializerBatchSize) { flush() } } if (objectsWritten > 0) { flush() writer.close() } else { writer.revertPartialWritesAndClose() // The first call point } success = true } finally { if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() // The second call point if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } {code} There are two questions about the above code: 1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by "writer.close()"? I think there are two possibilities to get into this branch: * One possibility is all data has been flush(), I think we can call "writer.close()" directly because all data has been flushed, "committedPosition" of DiskBlockObjectWriter should eq file.length. * Another is inMemoryIterator is empty, in this scenario whether calling "revertPartialWritesAndClose()" or calling "close()", the file.length is both 0. And if use "writer.close()" instead of "writer.revertPartialWritesAndClose() " , all UTs will passed, so what is the specific scenario that must call the "revertPartialWritesAndClose() " method? 2. For the 2nd call point, the main goal is to roll back writeMetrics in DiskBlockObjectWriter? If we want to delete this file, Is the truncate operation in the "revertPartialWritesAndClose() " method really necessary?In this scenario, should we just roll back writeMetrics without truncate file to reduce one disk operation? was: Although there are some similar discussions in SPARK-17562, but I still have some questions. I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 5 places in Spark Code, Two of the call points are in the "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the last is in the "BypassMergeSortShuffleWriter#stop" method. Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an example: {code:java} val (blockId, file) = diskBlockManager.createTempLocalBlock() val writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics) var objectsWritten = 0 // List of batch sizes (bytes) in the order they are written to disk val batchSizes = new ArrayBuffer[Long] // Flush the disk writer's contents to disk, and update relevant variables def flush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false try { while (inMemoryIterator.hasNext) { val kv = inMemoryIterator.next() writer.write(kv._1, kv._2) objectsWritten += 1 if (objectsWritten == serializerBatchSize) { flush() } } if (objectsWritten > 0) { flush() writer.close() } else { writer.revertPartialWritesAndClose() // The first call point } success = true } finally { if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() // The second call point if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } new DiskMapIterator(file, blockId, batchSizes) {code} There are two questions about the above code: 1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by "writer.close()"? I think there are two possibilities to get into this branch: * One possibility is all data has been flush(), I think we can call "writer.close()" directly because all data has been flushed, "committedPosition" of DiskBlockObjectWriter should eq file.length. * Another is inMemoryIterator is empty, in this scenario whether calling "revertPartialWritesAndClose()" or calling "close()", the file.length is both 0. And if use "writer.close()" instead of "writer.revertPartialWritesAndClose() " , all UTs will passed, so what is the specific scenario that must call the "revertPartialWritesAndClose() " method? 2. For the 2nd call point, the main goal is to roll back writeMetrics in DiskBlockObjectWriter? If we want to delete this file, Is the truncate operation in the "revertPartialWritesAndClose() " method really necessary?In this scenario, should we just roll back writeMetrics without truncate file to reduce one disk operation? > Doubts about the use of the > "DiskBlockObjectWriter#revertPartialWritesAndClose" method in Spark Code > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-33424 > URL: https://issues.apache.org/jira/browse/SPARK-33424 > Project: Spark > Issue Type: Question > Components: Spark Core > Affects Versions: 3.1.0 > Reporter: Yang Jie > Priority: Minor > > Although there are some similar discussions in SPARK-17562, but I still have > some questions. > I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called > in 5 places in Spark Code, > Two of the call points are in the > "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, two similar call > points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the > last is in the "BypassMergeSortShuffleWriter#stop" method. > Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an > example: > > {code:java} > var success = false > try { > while (inMemoryIterator.hasNext) { > val kv = inMemoryIterator.next() > writer.write(kv._1, kv._2) > objectsWritten += 1 > if (objectsWritten == serializerBatchSize) { > flush() > } > } > if (objectsWritten > 0) { > flush() > writer.close() > } else { > writer.revertPartialWritesAndClose() // The first call point > } > success = true > } finally { > if (!success) { > // This code path only happens if an exception was thrown above before we > set success; > // close our stuff and let the exception be thrown further > writer.revertPartialWritesAndClose() // The second call point > if (file.exists()) { > if (!file.delete()) { > logWarning(s"Error deleting ${file}") > } > } > } > } > {code} > > There are two questions about the above code: > 1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by > "writer.close()"? > I think there are two possibilities to get into this branch: > * One possibility is all data has been flush(), I think we can call > "writer.close()" directly because all data has been flushed, > "committedPosition" of DiskBlockObjectWriter should eq file.length. > * Another is inMemoryIterator is empty, in this scenario whether calling > "revertPartialWritesAndClose()" or calling "close()", the file.length is both > 0. > And if use "writer.close()" instead of "writer.revertPartialWritesAndClose() > " , all UTs will passed, so what is the specific scenario that must call the > "revertPartialWritesAndClose() " method? > 2. For the 2nd call point, the main goal is to roll back writeMetrics in > DiskBlockObjectWriter? > If we want to delete this file, Is the truncate operation in the > "revertPartialWritesAndClose() " method really necessary?In this scenario, > should we just roll back writeMetrics without truncate file to reduce one > disk operation? > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org