[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212168161 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Yes,you are right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212167438 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- > The numRecordsWritten in DiskBlockObjectWriter is still correct during the process after this PR The number is correct, but it is not consistent with what real happen compare to current behaviour. But as you said, we will get correct result at the end. So, it may not be a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212166322 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- The `numRecordsWritten` in `DiskBlockObjectWriter` is still correct during the process after this PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212165385 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Yeah, this result of `_bytesWritten` would not have been updated synchronously before, you can see this condition:`if (numRecordsWritten % 16384 == 0)`. But we do not need worry. the final result is correct, because it will be updated in `commitAndGet` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212163785 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Yeah, I agree there' s no difference as for final result. But `writeMetrics` in `DiskBlockObjectWriter` would be incorrect during the process. Not only `numRecordsWritten`, but also `_bytesWritten`(this could only be correctly counted when `writer.write()` is called. You can see `recordWritten#updateBytesWritten` for detail). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212162206 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Actually, it doesn't need to change. The result `numRecordsWritten` has no effect, tt's only written in `diskWriteBuffer` before, but now it's written to `writeBuffer`. The bytes which has Written will be updated in `commitAndGet` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212160161 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Oh, I see. If so, I'm afraid you may have to change ` writer.recordWritten()`'s behaviour, which just count records one bye one right now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212154100 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Thanks @Ngone51 If we got `n` records with size n*X < diskWriteBufferSize(same as DISK_WRITE_BUFFER_SIZE), then we will only call writer.write() once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r211954019 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Not a bad idea, but codes here may not work as you expect. If we got a record with size `X` < `diskWriteBufferSize `(same as `DISK_WRITE_BUFFER_SIZE `), then we will only call `writer.write()` once. And if we got a record with size `Y` >= `diskWriteBufferSize `, then we will call `writer.write()` for (`Y` + `diskWriteBufferSize ` - 1) / `diskWriteBufferSize` times. And this do not change with the new code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
GitHub user 10110346 opened a pull request: https://github.com/apache/spark/pull/22163 [SPARK-25166][CORE]Reduce the number of write operations for shuffle write. ## What changes were proposed in this pull request? Currently, only one record is written to a buffer each time, which increases the number of copies. I think we should write as many records as possible each time. ## How was this patch tested? Existed unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/10110346/spark reducewrite Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22163.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22163 commit 671268b679f9221fd96e9ab2ea929df4a9908de8 Author: liuxian Date: 2018-08-21T02:42:30Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org