[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21070 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186793493 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform) {
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186791067 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186789552 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java --- @@ -619,32 +608,37 @@ private int ceil8(int value) { /** * Reads the next group. */ - private void readNextGroup() { -int header = readUnsignedVarInt(); -this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; -switch (mode) { - case RLE: -this.currentCount = header >>> 1; -this.currentValue = readIntLittleEndianPaddedOnBitWidth(); -return; - case PACKED: -int numGroups = header >>> 1; -this.currentCount = numGroups * 8; -int bytesToRead = ceil8(this.currentCount * this.bitWidth); - -if (this.currentBuffer.length < this.currentCount) { - this.currentBuffer = new int[this.currentCount]; -} -currentBufferIdx = 0; -int valueIndex = 0; -for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) { - this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex); - valueIndex += 8; -} -offset += bytesToRead; -return; - default: -throw new ParquetDecodingException("not a valid mode " + this.mode); + private void readNextGroup() { +try { + int header = readUnsignedVarInt(); + this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; + switch (mode) { +case RLE: + this.currentCount = header >>> 1; + this.currentValue = readIntLittleEndianPaddedOnBitWidth(); + return; +case PACKED: + int numGroups = header >>> 1; + this.currentCount = numGroups * 8; + + if (this.currentBuffer.length < this.currentCount) { +this.currentBuffer = new int[this.currentCount]; + } + currentBufferIdx = 0; + int valueIndex = 0; + while (valueIndex < this.currentCount) { +// values are bit packed 8 at a time, so reading bitWidth will always work +ByteBuffer buffer = in.slice(bitWidth); +this.packer.unpack8Values( +buffer, buffer.arrayOffset() + buffer.position(), this.currentBuffer, valueIndex); --- End diff -- Good catch. Fixed to remove the call to `arrayOffset`. It should work with both on- and off-heap buffers now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186785996 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform) {
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186604021 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186603708 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java --- @@ -619,32 +608,37 @@ private int ceil8(int value) { /** * Reads the next group. */ - private void readNextGroup() { -int header = readUnsignedVarInt(); -this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; -switch (mode) { - case RLE: -this.currentCount = header >>> 1; -this.currentValue = readIntLittleEndianPaddedOnBitWidth(); -return; - case PACKED: -int numGroups = header >>> 1; -this.currentCount = numGroups * 8; -int bytesToRead = ceil8(this.currentCount * this.bitWidth); - -if (this.currentBuffer.length < this.currentCount) { - this.currentBuffer = new int[this.currentCount]; -} -currentBufferIdx = 0; -int valueIndex = 0; -for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) { - this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex); - valueIndex += 8; -} -offset += bytesToRead; -return; - default: -throw new ParquetDecodingException("not a valid mode " + this.mode); + private void readNextGroup() { +try { + int header = readUnsignedVarInt(); + this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; + switch (mode) { +case RLE: + this.currentCount = header >>> 1; + this.currentValue = readIntLittleEndianPaddedOnBitWidth(); + return; +case PACKED: + int numGroups = header >>> 1; + this.currentCount = numGroups * 8; + + if (this.currentBuffer.length < this.currentCount) { +this.currentBuffer = new int[this.currentCount]; + } + currentBufferIdx = 0; + int valueIndex = 0; + while (valueIndex < this.currentCount) { +// values are bit packed 8 at a time, so reading bitWidth will always work +ByteBuffer buffer = in.slice(bitWidth); +this.packer.unpack8Values( +buffer, buffer.arrayOffset() + buffer.position(), this.currentBuffer, valueIndex); --- End diff -- shall we assume the `ByteBuffer` may not be on-heap? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186603010 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186602846 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +// Bytes are stored as a 4-byte little endian int. Just read the first byte. +// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + try { +currentByte = (byte) in.read(); + } catch (IOException e) { +throw new ParquetDecodingException("Failed to read a byte", e); + } +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatform
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186469029 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -345,7 +345,7 @@ object SQLConf { "snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) -.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo")) +.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd")) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186468896 --- Diff: dev/deps/spark-deps-hadoop-2.7 --- @@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar +parquet-column-1.10.0.jar +parquet-common-1.10.0.jar +parquet-encoding-1.10.0.jar +parquet-format-2.4.0.jar +parquet-hadoop-1.10.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar +parquet-jackson-1.10.0.jar --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186464674 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { --- End diff -- No, there is no guarantee that the buffer from Parquet is on the heap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186464557 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); --- End diff -- No, `slice` doesn't copy. That's why we're using `ByteBuffer` now, to avoid copy operations. Setting the byte order to `LITTLE_ENDIAN` is correct because it is for the buffer and Parquet buffers store values in little endian: https://github.com/apache/parquet-format/blob/master/Encodings.md. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186358096 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); --- End diff -- previously we only call `.order(ByteOrder.LITTLE_ENDIAN)` if it's a big-endian platform. Is it OK to alway call it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186357714 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { --- End diff -- shall we assert `buffer.hasArray()` is always true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186357371 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); --- End diff -- does `in.slice(length)` do copy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186325093 --- Diff: dev/deps/spark-deps-hadoop-2.7 --- @@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar +parquet-column-1.10.0.jar +parquet-common-1.10.0.jar +parquet-encoding-1.10.0.jar +parquet-format-2.4.0.jar +parquet-hadoop-1.10.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar +parquet-jackson-1.10.0.jar --- End diff -- (btw, don't forget to fix https://github.com/apache/spark/blob/ce7ba2e98e0a3b038e881c271b5905058c43155b/dev/deps/spark-deps-hadoop-3.1#L184-L190 too) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186276440 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -345,7 +345,7 @@ object SQLConf { "snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) -.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo")) +.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd")) --- End diff -- Could you update [sql-programming-guide.md](https://github.com/apache/spark/blame/master/docs/sql-programming-guide.md#L967) together? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184156731 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- This is data dependent so it is hard to estimate. We write the stats for older readers when the type uses a signed sort order, so it is limited to mostly primitive types and won't be written for byte arrays or utf8 data. That limits the size to 16 bytes + thrift overhead per page and you might have about 100 pages per row group. So 1.5k per 128MB, which is about 0.001%. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184144930 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- Then it looks fine, but the new metadata max/min fields are added in file metadata, column metadata and page header metadata? Any formula we can use to calculate the size increase? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184139736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- Parquet fixed a problem with value ordering in statistics, which required adding new metadata min and max fields. For older readers, Parquet also writes the old values when it makes sense to. This is a slight increase in overhead, which is more noticeable when files contain just a few records. Don't be alarmed at the percentage difference here, it is just a small file. Parquet isn't increasing file sizes by 8%, that would be silly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r184116235 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala --- @@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 740) + assert(inMemoryRelation.computeStats().sizeInBytes === 800) --- End diff -- Our optimizer uses the statistics to decide the plans (e.g., in join algorithm selection). Thus, the plans could be completely different if the file size increases by 8 percents. Could you give us more contexts? cc @rdblue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r182563063 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { +try { + return (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + currentByte = getByte(); +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPlatfo
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user scottcarey commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r182161734 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { +try { + return (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); + // skip the next 3 bytes + buffer.position(buffer.position() + 3); } } @Override public final boolean readBoolean() { -byte b = Platform.getByte(buffer, offset); -boolean v = (b & (1 << bitOffset)) != 0; +// TODO: vectorize decoding and keep boolean[] instead of currentByte +if (bitOffset == 0) { + currentByte = getByte(); +} + +boolean v = (currentByte & (1 << bitOffset)) != 0; bitOffset += 1; if (bitOffset == 8) { bitOffset = 0; - offset++; } return v; } @Override public final int readInteger() { -int v = Platform.getInt(buffer, offset); -if (bigEndianPl
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181901031 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { --- End diff -- Is this used anywhere other than line 154? If not, can be inlined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181902045 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,159 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putInt(rowId + i, buffer.getInt()); + } +} } @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { -c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putLong(rowId + i, buffer.getLong()); + } +} } @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { -c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putFloat(rowId + i, buffer.getFloat()); + } +} } @Override public final void readDoubles(int total, WritableColumnVector c, int rowId) { -c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 8 * total; +int requiredBytes = total * 8; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET); +} else { + for (int i = 0; i < total; i += 1) { +c.putDouble(rowId + i, buffer.getDouble()); + } +} + } + + private byte getByte() { +try { + return (byte) in.read(); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read a byte", e); +} } @Override public final void readBytes(int total, WritableColumnVector c, int rowId) { -for (int i = 0; i < total; i++) { - // Bytes are stored as a 4-byte little endian int. Just read the first byte. - // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride. - c.putByte(rowId + i, Platform.getByte(buffer, offset)); - offset += 4; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { + c.putByte(rowId + i, buffer.get()); --- End diff -- could you preserve the comment about "Bytes are stored as 4-byte little endian int. Just read the first byte."? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181899509 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181889287 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- It seems worth it to me, to be defensive against performance changes - but feel free to punt it to me as a follow-on patch if you'd rather. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181883674 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Yeah, exactly. We can detect that the buffer is backed by an array and use the other call. If we think this is worth it as a short-term fix, I'll update this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181882476 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Isn't that what `hasArray()` is for though? If the buffers are backed by a byte array, `hasArray()` returns true and accessing the byte array via `array()` should be 0 cost. (If `array()` actually copies any data, that would invalidate this line of reasoning but would also be unexpected). So for example, here you'd have: public final void readIntegers(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); if (buffer.hasArray()) { c.putIntsLittleEndian(rowId, total, buffer.array(), 0); } else { for (int i = 0; i < total; i += 1) { c.putInt(rowId + i, buffer.getInt()); } } } This seems to be the same pattern that's in `readBinary()`, below. Let me know if I'm missing something! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181864492 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- The reason why I moved the loop out was that I didn't think that using the byte[] API would actually be better. Parquet doesn't guarantee that these byte buffers are on the heap and backed by byte arrays, so we would need to copy the bytes out of the buffer into an array only to copy them again in the column vector call. Two copies (and possibly allocation) seemed worse than moving the loop. We could catch the case where the buffers are on-heap and make the bulk call. The drawback is that this will be temporary and will be removed when ColumnVector supports ByteBuffer. And, it only works/matters when Parquet uses on-heap buffers and Spark uses off-heap buffers. Is that worth the change to this PR? I can take a shot at it if you think it is. I'd rather update ColumnVector and then follow up though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181846514 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Agreed that fixing the `ByteBuffer` / `ColumnVector` interaction should be dealt with elsewhere. I'm just raising the possibility of _regressing_ the read path here because the copies are less efficient. Since it's going to be a while before 2.4.0, that might be ok if we commit to fixing it - but it superficially seems like a manageable change to the PR since the code to call the bulk APIs is already there. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181826671 --- Diff: pom.xml --- @@ -129,7 +129,7 @@ 1.2.1 10.12.1.1 -1.8.2 +1.10.0 --- End diff -- I excluded the commons-pool dependency from parquet-hadoop to avoid this. I also tested the latest Parquet release with commons-pool 1.5.4 and everything passes. I don't think it actually requires 1.6. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org