[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754834288



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,50 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// Using >>3 instead of /8 below. The difference is important when 
(total-(8-bitOffset))<0.
+// E.g. (-1)>>3=(-1) vs. (-1)/8=0. The latter incorrectly enters the 
if(numBytesToSkip>=0){.

Review comment:
   Why do you even need to enter if numBytesToSkip is 0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754830520



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);

Review comment:
   Does the code work for total = 9 and bitOffset = 0?

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);

Review comment:
   Does the code work for total = 9 and bitOffset = 0? Can you add a test 
case for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754826874



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// using >>3 instead of /8 below since Java division rounds towards zero 
i.e. (-1)/8=0

Review comment:
   My concern is that the testing coverage is low and we could introduce 
subtle bugs that could be difficult to debug. I guess it is fine to keep as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754826446



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// using >>3 instead of /8 below since Java division rounds towards zero 
i.e. (-1)/8=0

Review comment:
   Hmm.. Why do you even need to go into the if statement if numBytesToSkip 
is 0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754804617



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);

Review comment:
   Yes, I understand but what about `readBooleans()` method? Does it mean 
that I can only call readBooleans() once? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754797714



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);

Review comment:
   Never mind, I missed that you were updating the variable i.
   
   By the way, what happens when total = 8 - bitOffset? Do we need to call 
updateCurrentByte() somewhere for this as bitOffset will become 0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754796220



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
##
@@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
   }
 
+  testVector("Boolean APIs", 1024, BooleanType) {
+column =>
+  val reference = mutable.ArrayBuffer.empty[Boolean]
+
+  var values = Array(true, false, true, false, false)
+  var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 
0)).toByte
+  column.appendBooleans(2, bits, 0)
+  reference ++= values.slice(0, 2)
+
+  column.appendBooleans(3, bits, 2)
+  reference ++= values.slice(2, 5)
+
+  column.appendBooleans(6, true)
+  reference ++= Array.fill(6)(true)
+
+  column.appendBoolean(false)
+  reference += false
+
+  var idx = column.elementsAppended
+
+  values = Array(true, true, false, true, false, true, false, true)
+  bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte
+  column.putBooleans(idx, 2, bits, 0)
+  reference ++= values.slice(0, 2)
+  idx += 2
+
+  column.putBooleans(idx, 3, bits, 2)
+  reference ++= values.slice(2, 5)
+  idx += 3
+
+  column.putBooleans(idx, bits)
+  reference ++= values
+  idx += 8
+
+  column.putBoolean(idx, false)
+  reference += false
+  idx += 1
+
+  column.putBooleans(idx, 3, true)
+  reference ++= Array.fill(3)(true)
+  idx += 3
+
+  implicit def intToByte(i: Int): Byte = i.toByte
+  val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 
0xEE))
+  val reader = new VectorizedPlainValuesReader()
+  reader.initFromPage(0, ByteBufferInputStream.wrap(buf))
+  column.putBoolean(idx, reader.readBoolean) // bit index 0
+  reference += true
+  idx += 1
+
+  reader.skipBooleans(1)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 2
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(5)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 8
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(8)

Review comment:
   Never mind, the test suite has a confusing structure, it looks like they 
just test the final result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754795330



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {

Review comment:
   Yes, makes sense. Let's keep it as is. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754794034



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// using >>3 instead of /8 below since Java division rounds towards zero 
i.e. (-1)/8=0

Review comment:
   I should have clarified my comment - total should be larger than 8 to 
skip at least 1 byte. Thanks for pointing it out.
   
   What I meant was that comment does not reflect the actual purpose of bit 
shift and is confusing. All you are trying to figure out is whether 
numBytesToSkip is positive or not so both approaches would work just fine. Bit 
shift tends to be faster than the division operation - I am not sure if JVM is 
smart enough to realise that these operations are equivalent in this case. 
   
   By the way, shouldn't 
   ```
   if (bitOffset > 0) {
 updateCurrentByte();
   }
   ```
   be outside of the `if (numBytesToSkip >= 0) {`? In other words, what happens 
when the total is 9 and bitOffset is 0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754794034



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// using >>3 instead of /8 below since Java division rounds towards zero 
i.e. (-1)/8=0

Review comment:
   I should have clarified my comment - total should be larger than 
8-bitOffset to skip at least 1 byte. Thanks for pointing it out.
   
   What I meant was that comment does not reflect the actual purpose of bit 
shift and is confusing. All you are trying to figure out is whether 
numBytesToSkip is positive or not so both approaches would work just fine. Bit 
shift tends to be faster than the division operation - I am not sure if JVM is 
smart enough to realise that these operations are equivalent in this case. 
   
   By the way, shouldn't 
   ```
   if (bitOffset > 0) {
 updateCurrentByte();
   }
   ```
   be outside of the `if (numBytesToSkip >= 0) {`? In other words, what happens 
when the total is 9 and bitOffset is 0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-22 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r754789534



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
##
@@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
   }
 
+  testVector("Boolean APIs", 1024, BooleanType) {
+column =>
+  val reference = mutable.ArrayBuffer.empty[Boolean]
+
+  var values = Array(true, false, true, false, false)
+  var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 
0)).toByte
+  column.appendBooleans(2, bits, 0)
+  reference ++= values.slice(0, 2)
+
+  column.appendBooleans(3, bits, 2)
+  reference ++= values.slice(2, 5)
+
+  column.appendBooleans(6, true)
+  reference ++= Array.fill(6)(true)
+
+  column.appendBoolean(false)
+  reference += false
+
+  var idx = column.elementsAppended
+
+  values = Array(true, true, false, true, false, true, false, true)
+  bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte
+  column.putBooleans(idx, 2, bits, 0)
+  reference ++= values.slice(0, 2)
+  idx += 2
+
+  column.putBooleans(idx, 3, bits, 2)
+  reference ++= values.slice(2, 5)
+  idx += 3
+
+  column.putBooleans(idx, bits)
+  reference ++= values
+  idx += 8
+
+  column.putBoolean(idx, false)
+  reference += false
+  idx += 1
+
+  column.putBooleans(idx, 3, true)
+  reference ++= Array.fill(3)(true)
+  idx += 3
+
+  implicit def intToByte(i: Int): Byte = i.toByte
+  val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 
0xEE))
+  val reader = new VectorizedPlainValuesReader()
+  reader.initFromPage(0, ByteBufferInputStream.wrap(buf))
+  column.putBoolean(idx, reader.readBoolean) // bit index 0
+  reference += true
+  idx += 1
+
+  reader.skipBooleans(1)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 2
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(5)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 8
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(8)

Review comment:
   We need to add a check to make sure the operation was a no-op, not just 
adding a call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-21 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r753867551



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
##
@@ -130,6 +133,89 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
   }
 
+  testVector("Boolean APIs", 1024, BooleanType) {
+column =>
+  val reference = mutable.ArrayBuffer.empty[Boolean]
+
+  var values = Array(true, false, true, false, false)
+  var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 
0)).toByte
+  column.appendBooleans(2, bits, 0)
+  reference ++= values.slice(0, 2)
+
+  column.appendBooleans(3, bits, 2)
+  reference ++= values.slice(2, 5)
+
+  column.appendBooleans(6, true)
+  reference ++= Array.fill(6)(true)
+
+  column.appendBoolean(false)
+  reference += false
+
+  var idx = column.elementsAppended
+
+  values = Array(true, true, false, true, false, true, false, true)
+  bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte
+  column.putBooleans(idx, 2, bits, 0)
+  reference ++= values.slice(0, 2)
+  idx += 2
+
+  column.putBooleans(idx, 3, bits, 2)
+  reference ++= values.slice(2, 5)
+  idx += 3
+
+  column.putBooleans(idx, bits)
+  reference ++= values
+  idx += 8
+
+  column.putBoolean(idx, false)
+  reference += false
+  idx += 1
+
+  column.putBooleans(idx, 3, true)
+  reference ++= Array.fill(3)(true)
+  idx += 3
+
+  implicit def intToByte(i: Int): Byte = i.toByte
+  val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 
0xEE))
+  val reader = new VectorizedPlainValuesReader()
+  reader.initFromPage(0, ByteBufferInputStream.wrap(buf))
+  column.putBoolean(idx, reader.readBoolean) // bit index 0
+  reference += true
+  idx += 1
+
+  reader.skipBooleans(1)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 2
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(5)
+
+  column.putBoolean(idx, reader.readBoolean) // bit index 8
+  reference += false
+  idx += 1
+
+  reader.skipBooleans(8)

Review comment:
   How about we include `skipBooleans(0)`?

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  c.putBoolean(rowId + i, readBoolean());
+int i = 0;
+if (bitOffset > 0) {
+  i = Math.min(8 - bitOffset, total);
+  c.putBooleans(rowId, i, currentByte, bitOffset);
+  bitOffset = (bitOffset + i) & 7;
+}
+for (; i + 7 < total; i += 8) {
+  updateCurrentByte();
+  c.putBooleans(rowId + i, currentByte);
+}
+if (i < total) {
+  updateCurrentByte();
+  bitOffset = total - i;
+  c.putBooleans(rowId + i, bitOffset, currentByte, 0);
 }
   }
 
   @Override
   public final void skipBooleans(int total) {
-// TODO: properly vectorize this
-for (int i = 0; i < total; i++) {
-  readBoolean();
+// using >>3 instead of /8 below since Java division rounds towards zero 
i.e. (-1)/8=0

Review comment:
   Can we revisit this comment? It might be inaccurate. In this case, 
`(total - (8 - bitOffset))` should never be negative, otherwise it indicates a 
bug in the code.
   
   You can add a check if you are skipping 0 values, this should be a no-op. I 
am curious if there is a test for it.

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##
@@ -470,6 +493,18 @@ public final int appendBooleans(int count, boolean v) {
 return result;
   }
 
+  /**
+   * Append bits from [src[offset], src[offset + count])
+   * src must contain bit-packed 8 Booleans in the byte.

Review comment:
   nit: `booleans` or `boolean values`.

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -53,19 +53,47 @@ public void skip() {
 throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+try {
+  currentByte = (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-// TODO: properly vectorize this
-

[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-17 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r751668591



##
File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt
##
@@ -1,252 +1,275 @@
+
+SQL Single Boolean Column Scan
+
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+SQL Single BOOLEAN Column Scan:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+SQL CSV   13472  13878 
574  1.2 856.5   1.0X
+SQL Json  10036  10477 
623  1.6 638.0   1.3X
+SQL Parquet Vectorized  144167 
 12109.2   9.2  93.5X
+SQL Parquet MR 2224   2230 
  7  7.1 141.4   6.1X
+SQL ORC Vectorized  191203 
  6 82.3  12.2  70.5X
+SQL ORC MR 1865   1870 
  7  8.4 118.6   7.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Parquet Reader Single BOOLEAN Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+-
+ParquetReader Vectorized 119125
   8131.9   7.6   1.0X
+ParquetReader Vectorized -> Row   60 63
   2260.2   3.8   2.0X
+
+
 

 SQL Single Numeric Column Scan
 

 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 SQL Single TINYINT Column Scan:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-SQL CSV   15943  15956 
 18  1.01013.6   1.0X
-SQL Json   9109   9158 
 70  1.7 579.1   1.8X
-SQL Parquet Vectorized  168191 
 16 93.8  10.7  95.1X
-SQL Parquet MR 1938   1950 
 17  8.1 123.2   8.2X
-SQL ORC Vectorized  191199 
  6 82.2  12.2  83.3X
-SQL ORC MR 1523   1537 
 20 10.3  96.8  10.5X
-
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+SQL CSV   16820  16859 
 54  0.91069.4   1.0X
+SQL Json  11583  11586 
  4  1.4 736.4   1.5X
+SQL Parquet Vectorized  164177 
 11 96.0  10.4 102.7X
+SQL Parquet MR 2839   2857 
 25  5.5 180.5   5.9X
+SQL ORC Vectorized  150161 
  7104.8   9.5 112.1X
+SQL ORC MR 1915   1923 
 12  8.2 121.7   8.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 Parquet Reader Single TINYINT Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
-
-ParquetReader Vectorized 203206
   3 77.5  12.9   1.0X
-ParquetReader Vectorized -> Row

[GitHub] [spark] sadikovi commented on a change in pull request #34611: [SPARK-35867][SQL] Enable vectorized read for VectorizedPlainValuesReader.readBooleans

2021-11-17 Thread GitBox


sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r751632478



##
File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt
##
@@ -1,252 +1,275 @@
+
+SQL Single Boolean Column Scan
+
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+SQL Single BOOLEAN Column Scan:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+SQL CSV   13472  13878 
574  1.2 856.5   1.0X
+SQL Json  10036  10477 
623  1.6 638.0   1.3X
+SQL Parquet Vectorized  144167 
 12109.2   9.2  93.5X
+SQL Parquet MR 2224   2230 
  7  7.1 141.4   6.1X
+SQL ORC Vectorized  191203 
  6 82.3  12.2  70.5X
+SQL ORC MR 1865   1870 
  7  8.4 118.6   7.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Parquet Reader Single BOOLEAN Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+-
+ParquetReader Vectorized 119125
   8131.9   7.6   1.0X
+ParquetReader Vectorized -> Row   60 63
   2260.2   3.8   2.0X
+
+
 

 SQL Single Numeric Column Scan
 

 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 SQL Single TINYINT Column Scan:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-SQL CSV   15943  15956 
 18  1.01013.6   1.0X
-SQL Json   9109   9158 
 70  1.7 579.1   1.8X
-SQL Parquet Vectorized  168191 
 16 93.8  10.7  95.1X
-SQL Parquet MR 1938   1950 
 17  8.1 123.2   8.2X
-SQL ORC Vectorized  191199 
  6 82.2  12.2  83.3X
-SQL ORC MR 1523   1537 
 20 10.3  96.8  10.5X
-
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+SQL CSV   16820  16859 
 54  0.91069.4   1.0X
+SQL Json  11583  11586 
  4  1.4 736.4   1.5X
+SQL Parquet Vectorized  164177 
 11 96.0  10.4 102.7X
+SQL Parquet MR 2839   2857 
 25  5.5 180.5   5.9X
+SQL ORC Vectorized  150161 
  7104.8   9.5 112.1X
+SQL ORC MR 1915   1923 
 12  8.2 121.7   8.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 Parquet Reader Single TINYINT Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
-
-ParquetReader Vectorized 203206
   3 77.5  12.9   1.0X
-ParquetReader Vectorized -> Row