Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18704#discussion_r138838983 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala --- @@ -61,6 +63,162 @@ private[columnar] case object PassThrough extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(columnVector: WritableColumnVector, capacity: Int): Unit = { + val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder()) + nullsBuffer.rewind() + val nullCount = ByteBufferHelper.getInt(nullsBuffer) + var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else capacity + var pos = 0 + var seenNulls = 0 + val srcArray = buffer.array + var bufferPos = buffer.position + columnType.dataType match { + case _: BooleanType => + val unitSize = 1 + while (pos < capacity) { + if (pos != nextNullIndex) { + val len = nextNullIndex - pos + assert(len * unitSize < Int.MaxValue) + for (i <- 0 until len) { + val value = buffer.get(bufferPos + i) != 0 + columnVector.putBoolean(pos + i, value) + } + bufferPos += len + pos += len + } else { + seenNulls += 1 + nextNullIndex = if (seenNulls < nullCount) { + ByteBufferHelper.getInt(nullsBuffer) + } else { + capacity + } + columnVector.putNull(pos) + pos += 1 + } + } + case _: ByteType => --- End diff -- Removed code duplication by using a function object. How about this?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org