[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754834288 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,50 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// Using >>3 instead of /8 below. The difference is important when (total-(8-bitOffset))<0. +// E.g. (-1)>>3=(-1) vs. (-1)/8=0. The latter incorrectly enters the if(numBytesToSkip>=0){. Review comment: Why do you even need to enter if numBytesToSkip is 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754830520 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); Review comment: Does the code work for total = 9 and bitOffset = 0? ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); Review comment: Does the code work for total = 9 and bitOffset = 0? Can you add a test case for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754826874 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// using >>3 instead of /8 below since Java division rounds towards zero i.e. (-1)/8=0 Review comment: My concern is that the testing coverage is low and we could introduce subtle bugs that could be difficult to debug. I guess it is fine to keep as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754826446 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// using >>3 instead of /8 below since Java division rounds towards zero i.e. (-1)/8=0 Review comment: Hmm.. Why do you even need to go into the if statement if numBytesToSkip is 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754804617 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); Review comment: Yes, I understand but what about `readBooleans()` method? Does it mean that I can only call readBooleans() once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754797714 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); Review comment: Never mind, I missed that you were updating the variable i. By the way, what happens when total = 8 - bitOffset? Do we need to call updateCurrentByte() somewhere for this as bitOffset will become 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754796220 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala ## @@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("Boolean APIs", 1024, BooleanType) { +column => + val reference = mutable.ArrayBuffer.empty[Boolean] + + var values = Array(true, false, true, false, false) + var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.appendBooleans(2, bits, 0) + reference ++= values.slice(0, 2) + + column.appendBooleans(3, bits, 2) + reference ++= values.slice(2, 5) + + column.appendBooleans(6, true) + reference ++= Array.fill(6)(true) + + column.appendBoolean(false) + reference += false + + var idx = column.elementsAppended + + values = Array(true, true, false, true, false, true, false, true) + bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.putBooleans(idx, 2, bits, 0) + reference ++= values.slice(0, 2) + idx += 2 + + column.putBooleans(idx, 3, bits, 2) + reference ++= values.slice(2, 5) + idx += 3 + + column.putBooleans(idx, bits) + reference ++= values + idx += 8 + + column.putBoolean(idx, false) + reference += false + idx += 1 + + column.putBooleans(idx, 3, true) + reference ++= Array.fill(3)(true) + idx += 3 + + implicit def intToByte(i: Int): Byte = i.toByte + val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 0xEE)) + val reader = new VectorizedPlainValuesReader() + reader.initFromPage(0, ByteBufferInputStream.wrap(buf)) + column.putBoolean(idx, reader.readBoolean) // bit index 0 + reference += true + idx += 1 + + reader.skipBooleans(1) + + column.putBoolean(idx, reader.readBoolean) // bit index 2 + reference += false + idx += 1 + + reader.skipBooleans(5) + + column.putBoolean(idx, reader.readBoolean) // bit index 8 + reference += false + idx += 1 + + reader.skipBooleans(8) Review comment: Never mind, the test suite has a confusing structure, it looks like they just test the final result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754795330 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { Review comment: Yes, makes sense. Let's keep it as is. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754794034 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// using >>3 instead of /8 below since Java division rounds towards zero i.e. (-1)/8=0 Review comment: I should have clarified my comment - total should be larger than 8 to skip at least 1 byte. Thanks for pointing it out. What I meant was that comment does not reflect the actual purpose of bit shift and is confusing. All you are trying to figure out is whether numBytesToSkip is positive or not so both approaches would work just fine. Bit shift tends to be faster than the division operation - I am not sure if JVM is smart enough to realise that these operations are equivalent in this case. By the way, shouldn't ``` if (bitOffset > 0) { updateCurrentByte(); } ``` be outside of the `if (numBytesToSkip >= 0) {`? In other words, what happens when the total is 9 and bitOffset is 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754794034 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// using >>3 instead of /8 below since Java division rounds towards zero i.e. (-1)/8=0 Review comment: I should have clarified my comment - total should be larger than 8-bitOffset to skip at least 1 byte. Thanks for pointing it out. What I meant was that comment does not reflect the actual purpose of bit shift and is confusing. All you are trying to figure out is whether numBytesToSkip is positive or not so both approaches would work just fine. Bit shift tends to be faster than the division operation - I am not sure if JVM is smart enough to realise that these operations are equivalent in this case. By the way, shouldn't ``` if (bitOffset > 0) { updateCurrentByte(); } ``` be outside of the `if (numBytesToSkip >= 0) {`? In other words, what happens when the total is 9 and bitOffset is 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r754789534 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala ## @@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("Boolean APIs", 1024, BooleanType) { +column => + val reference = mutable.ArrayBuffer.empty[Boolean] + + var values = Array(true, false, true, false, false) + var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.appendBooleans(2, bits, 0) + reference ++= values.slice(0, 2) + + column.appendBooleans(3, bits, 2) + reference ++= values.slice(2, 5) + + column.appendBooleans(6, true) + reference ++= Array.fill(6)(true) + + column.appendBoolean(false) + reference += false + + var idx = column.elementsAppended + + values = Array(true, true, false, true, false, true, false, true) + bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.putBooleans(idx, 2, bits, 0) + reference ++= values.slice(0, 2) + idx += 2 + + column.putBooleans(idx, 3, bits, 2) + reference ++= values.slice(2, 5) + idx += 3 + + column.putBooleans(idx, bits) + reference ++= values + idx += 8 + + column.putBoolean(idx, false) + reference += false + idx += 1 + + column.putBooleans(idx, 3, true) + reference ++= Array.fill(3)(true) + idx += 3 + + implicit def intToByte(i: Int): Byte = i.toByte + val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 0xEE)) + val reader = new VectorizedPlainValuesReader() + reader.initFromPage(0, ByteBufferInputStream.wrap(buf)) + column.putBoolean(idx, reader.readBoolean) // bit index 0 + reference += true + idx += 1 + + reader.skipBooleans(1) + + column.putBoolean(idx, reader.readBoolean) // bit index 2 + reference += false + idx += 1 + + reader.skipBooleans(5) + + column.putBoolean(idx, reader.readBoolean) // bit index 8 + reference += false + idx += 1 + + reader.skipBooleans(8) Review comment: We need to add a check to make sure the operation was a no-op, not just adding a call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r753867551 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala ## @@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("Boolean APIs", 1024, BooleanType) { +column => + val reference = mutable.ArrayBuffer.empty[Boolean] + + var values = Array(true, false, true, false, false) + var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.appendBooleans(2, bits, 0) + reference ++= values.slice(0, 2) + + column.appendBooleans(3, bits, 2) + reference ++= values.slice(2, 5) + + column.appendBooleans(6, true) + reference ++= Array.fill(6)(true) + + column.appendBoolean(false) + reference += false + + var idx = column.elementsAppended + + values = Array(true, true, false, true, false, true, false, true) + bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte + column.putBooleans(idx, 2, bits, 0) + reference ++= values.slice(0, 2) + idx += 2 + + column.putBooleans(idx, 3, bits, 2) + reference ++= values.slice(2, 5) + idx += 3 + + column.putBooleans(idx, bits) + reference ++= values + idx += 8 + + column.putBoolean(idx, false) + reference += false + idx += 1 + + column.putBooleans(idx, 3, true) + reference ++= Array.fill(3)(true) + idx += 3 + + implicit def intToByte(i: Int): Byte = i.toByte + val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 0xEE)) + val reader = new VectorizedPlainValuesReader() + reader.initFromPage(0, ByteBufferInputStream.wrap(buf)) + column.putBoolean(idx, reader.readBoolean) // bit index 0 + reference += true + idx += 1 + + reader.skipBooleans(1) + + column.putBoolean(idx, reader.readBoolean) // bit index 2 + reference += false + idx += 1 + + reader.skipBooleans(5) + + column.putBoolean(idx, reader.readBoolean) // bit index 8 + reference += false + idx += 1 + + reader.skipBooleans(8) Review comment: How about we include `skipBooleans(0)`? ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - c.putBoolean(rowId + i, readBoolean()); +int i = 0; +if (bitOffset > 0) { + i = Math.min(8 - bitOffset, total); + c.putBooleans(rowId, i, currentByte, bitOffset); + bitOffset = (bitOffset + i) & 7; +} +for (; i + 7 < total; i += 8) { + updateCurrentByte(); + c.putBooleans(rowId + i, currentByte); +} +if (i < total) { + updateCurrentByte(); + bitOffset = total - i; + c.putBooleans(rowId + i, bitOffset, currentByte, 0); } } @Override public final void skipBooleans(int total) { -// TODO: properly vectorize this -for (int i = 0; i < total; i++) { - readBoolean(); +// using >>3 instead of /8 below since Java division rounds towards zero i.e. (-1)/8=0 Review comment: Can we revisit this comment? It might be inaccurate. In this case, `(total - (8 - bitOffset))` should never be negative, otherwise it indicates a bug in the code. You can add a check if you are skipping 0 values, this should be a no-op. I am curious if there is a test for it. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java ## @@ -470,6 +493,18 @@ public final int appendBooleans(int count, boolean v) { return result; } + /** + * Append bits from [src[offset], src[offset + count]) + * src must contain bit-packed 8 Booleans in the byte. Review comment: nit: `booleans` or `boolean values`. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -53,19 +53,47 @@ public void skip() { throw new UnsupportedOperationException(); } + private void updateCurrentByte() { +try { + currentByte = (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} + } + @Override public final void readBooleans(int total, WritableColumnVector c, int rowId) { -// TODO: properly vectorize this -
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r751668591 ## File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt ## @@ -1,252 +1,275 @@ + +SQL Single Boolean Column Scan + + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +SQL Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +SQL CSV 13472 13878 574 1.2 856.5 1.0X +SQL Json 10036 10477 623 1.6 638.0 1.3X +SQL Parquet Vectorized 144167 12109.2 9.2 93.5X +SQL Parquet MR 2224 2230 7 7.1 141.4 6.1X +SQL ORC Vectorized 191203 6 82.3 12.2 70.5X +SQL ORC MR 1865 1870 7 8.4 118.6 7.2X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Parquet Reader Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative +- +ParquetReader Vectorized 119125 8131.9 7.6 1.0X +ParquetReader Vectorized -> Row 60 63 2260.2 3.8 2.0X + + SQL Single Numeric Column Scan -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -SQL CSV 15943 15956 18 1.01013.6 1.0X -SQL Json 9109 9158 70 1.7 579.1 1.8X -SQL Parquet Vectorized 168191 16 93.8 10.7 95.1X -SQL Parquet MR 1938 1950 17 8.1 123.2 8.2X -SQL ORC Vectorized 191199 6 82.2 12.2 83.3X -SQL ORC MR 1523 1537 20 10.3 96.8 10.5X - -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +SQL CSV 16820 16859 54 0.91069.4 1.0X +SQL Json 11583 11586 4 1.4 736.4 1.5X +SQL Parquet Vectorized 164177 11 96.0 10.4 102.7X +SQL Parquet MR 2839 2857 25 5.5 180.5 5.9X +SQL ORC Vectorized 150161 7104.8 9.5 112.1X +SQL ORC MR 1915 1923 12 8.2 121.7 8.8X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz Parquet Reader Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative - -ParquetReader Vectorized 203206 3 77.5 12.9 1.0X -ParquetReader Vectorized -> Row
[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans
sadikovi commented on a change in pull request #34611: URL: https://github.com/apache/spark/pull/34611#discussion_r751632478 ## File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt ## @@ -1,252 +1,275 @@ + +SQL Single Boolean Column Scan + + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +SQL Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +SQL CSV 13472 13878 574 1.2 856.5 1.0X +SQL Json 10036 10477 623 1.6 638.0 1.3X +SQL Parquet Vectorized 144167 12109.2 9.2 93.5X +SQL Parquet MR 2224 2230 7 7.1 141.4 6.1X +SQL ORC Vectorized 191203 6 82.3 12.2 70.5X +SQL ORC MR 1865 1870 7 8.4 118.6 7.2X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Parquet Reader Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative +- +ParquetReader Vectorized 119125 8131.9 7.6 1.0X +ParquetReader Vectorized -> Row 60 63 2260.2 3.8 2.0X + + SQL Single Numeric Column Scan -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -SQL CSV 15943 15956 18 1.01013.6 1.0X -SQL Json 9109 9158 70 1.7 579.1 1.8X -SQL Parquet Vectorized 168191 16 93.8 10.7 95.1X -SQL Parquet MR 1938 1950 17 8.1 123.2 8.2X -SQL ORC Vectorized 191199 6 82.2 12.2 83.3X -SQL ORC MR 1523 1537 20 10.3 96.8 10.5X - -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +SQL CSV 16820 16859 54 0.91069.4 1.0X +SQL Json 11583 11586 4 1.4 736.4 1.5X +SQL Parquet Vectorized 164177 11 96.0 10.4 102.7X +SQL Parquet MR 2839 2857 25 5.5 180.5 5.9X +SQL ORC Vectorized 150161 7104.8 9.5 112.1X +SQL ORC MR 1915 1923 12 8.2 121.7 8.8X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure +Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz Parquet Reader Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative - -ParquetReader Vectorized 203206 3 77.5 12.9 1.0X -ParquetReader Vectorized -> Row