[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212168161
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Yes,you are right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212167438
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

> The numRecordsWritten in DiskBlockObjectWriter is still correct during 
the process after this PR

The number is correct, but it is not consistent with what real happen 
compare to current behaviour.  But as you said, we will get correct result at 
the end. So, it may not be a big deal.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212166322
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

The `numRecordsWritten`  in `DiskBlockObjectWriter`  is still  correct 
during the process after this PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212165385
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Yeah,   this result of `_bytesWritten` would not have been updated 
synchronously before, you can see this condition:`if (numRecordsWritten % 16384 
== 0)`.
 But we do not  need worry. the final result is correct, because it will be 
updated in `commitAndGet`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212163785
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Yeah, I agree there' s no difference as for final result. But 
`writeMetrics` in `DiskBlockObjectWriter` would be incorrect during the 
process. Not only `numRecordsWritten`, but also `_bytesWritten`(this could only 
be correctly counted when `writer.write()` is called. You can see 
`recordWritten#updateBytesWritten` for detail).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212162206
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Actually, it doesn't need to change.
The result  `numRecordsWritten` has no effect,  tt's only written in 
`diskWriteBuffer` before, but now it's written to `writeBuffer`.
The bytes which has Written will be updated in `commitAndGet`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212160161
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Oh, I see. If so, I'm afraid you may have to change ` 
writer.recordWritten()`'s behaviour, which just count records one bye one right 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212154100
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Thanks @Ngone51 
If we got `n` records with size n*X < diskWriteBufferSize(same as 
DISK_WRITE_BUFFER_SIZE), then we will only call writer.write() once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r211954019
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Not a bad idea, but codes here may not work as you expect. If we got a 
record with size `X` < `diskWriteBufferSize `(same as `DISK_WRITE_BUFFER_SIZE 
`), then we will only call `writer.write()` once. And if we got a record with 
size `Y` >= `diskWriteBufferSize `, then we will call `writer.write()` for  
(`Y` + `diskWriteBufferSize ` - 1)  / `diskWriteBufferSize`  times. And this do 
not change with the new code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-20 Thread 10110346
GitHub user 10110346 opened a pull request:

https://github.com/apache/spark/pull/22163

[SPARK-25166][CORE]Reduce the number of write operations for shuffle write.

## What changes were proposed in this pull request?

Currently, only one record is written to a buffer each time, which 
increases the number of copies.
I think we should write as many records as possible each time.

## How was this patch tested?
Existed  unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/10110346/spark reducewrite

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22163.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22163


commit 671268b679f9221fd96e9ab2ea929df4a9908de8
Author: liuxian 
Date:   2018-08-21T02:42:30Z

fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org