[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-10-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18704


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-10-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r142428792
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types.AtomicType
+
+class PassThroughSuite extends SparkFunSuite {
+  val nullValue = -1
+  testPassThrough(new ByteColumnStats, BYTE)
+  testPassThrough(new ShortColumnStats, SHORT)
+  testPassThrough(new IntColumnStats, INT)
+  testPassThrough(new LongColumnStats, LONG)
+  testPassThrough(new FloatColumnStats, FLOAT)
+  testPassThrough(new DoubleColumnStats, DOUBLE)
+
+  def testPassThrough[T <: AtomicType](
+columnStats: ColumnStats,
+columnType: NativeColumnType[T]) {
--- End diff --

nit: indention is wrong here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-10-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r142428980
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
 ---
@@ -21,19 +21,22 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.types.AtomicType
 
 class RunLengthEncodingSuite extends SparkFunSuite {
+  val nullValue = -1
   testRunLengthEncoding(new NoopColumnStats, BOOLEAN)
   testRunLengthEncoding(new ByteColumnStats, BYTE)
   testRunLengthEncoding(new ShortColumnStats, SHORT)
   testRunLengthEncoding(new IntColumnStats, INT)
   testRunLengthEncoding(new LongColumnStats, LONG)
-  testRunLengthEncoding(new StringColumnStats, STRING)
+  testRunLengthEncoding(new StringColumnStats, STRING, false)
 
   def testRunLengthEncoding[T <: AtomicType](
-  columnStats: ColumnStats,
-  columnType: NativeColumnType[T]) {
+  columnStats: ColumnStats,
+  columnType: 
NativeColumnType[T],
+  testDecompress: Boolean = 
true) {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-19 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r139661951
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1311,4 +1314,172 @@ class ColumnarBatchSuite extends SparkFunSuite {
 batch.close()
 allocator.close()
   }
+
+  test("CachedBatch boolean Apis") {
--- End diff --

I see. Moved them into `ColumnVectorSuite`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r139605958
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1311,4 +1314,172 @@ class ColumnarBatchSuite extends SparkFunSuite {
 batch.close()
 allocator.close()
   }
+
+  test("CachedBatch boolean Apis") {
--- End diff --

move these to a new test suite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r139364602
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -169,6 +267,125 @@ private[columnar] case object RunLengthEncoding 
extends CompressionScheme {
 }
 
 override def hasNext: Boolean = valueCount < run || buffer.hasRemaining
+
+override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+  var pos = 0
+  var seenNulls = 0
+  var runLocal = 0
+  var valueCountLocal = 0
+  columnType.dataType match {
+case _: BooleanType =>
--- End diff --

same here, can we reduce code duplication?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r139362028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
 ---
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.execution.columnar.compression
 
+import java.nio.ByteBuffer
--- End diff --

unnecessary import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r139361487
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 ---
@@ -147,6 +147,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
--- End diff --

let's update them in this PR. BTW `WritableColumnVector` may be exposed to 
end users, so that they can build columnar batch to data source v2 columnar 
scan, so the document is very important.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138838983
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -61,6 +63,162 @@ private[columnar] case object PassThrough extends 
CompressionScheme {
 }
 
 override def hasNext: Boolean = buffer.hasRemaining
+
+override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else capacity
+  var pos = 0
+  var seenNulls = 0
+  val srcArray = buffer.array
+  var bufferPos = buffer.position
+  columnType.dataType match {
+case _: BooleanType =>
+  val unitSize = 1
+  while (pos < capacity) {
+if (pos != nextNullIndex) {
+  val len = nextNullIndex - pos
+  assert(len * unitSize < Int.MaxValue)
+  for (i <- 0 until len) {
+val value = buffer.get(bufferPos + i) != 0
+columnVector.putBoolean(pos + i, value)
+  }
+  bufferPos += len
+  pos += len
+} else {
+  seenNulls += 1
+  nextNullIndex = if (seenNulls < nullCount) {
+ByteBufferHelper.getInt(nullsBuffer)
+  } else {
+capacity
+  }
+  columnVector.putNull(pos)
+  pos += 1
+}
+  }
+case _: ByteType =>
--- End diff --

Removed code duplication by using a function object. How about this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138838838
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public final class ColumnDictionary implements Dictionary {
+  private Object[] dictionary;
+
+  public ColumnDictionary(Object[] dictionary) {
+this.dictionary = dictionary;
+  }
+
+  @Override
+  public int decodeToInt(int id) {
+return (Integer)dictionary[id];
--- End diff --

Yeah, I removed boxing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138838744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
 ---
@@ -149,4 +153,23 @@ private[columnar] object ColumnAccessor {
 throw new Exception(s"not support type: $other")
 }
   }
+
+  def decompress(columnAccessor: ColumnAccessor, columnVector: 
WritableColumnVector, numRows: Int):
+  Unit = {
+if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
+  val nativeAccessor = 
columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
+  nativeAccessor.decompress(columnVector, numRows)
+} else {
+  val dataBuffer = 
columnAccessor.asInstanceOf[BasicColumnAccessor[_]].getByteBuffer
+  val nullsBuffer = 
dataBuffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+
+  val numNulls = ByteBufferHelper.getInt(nullsBuffer)
+  for (i <- 0 until numNulls) {
+val cordinal = ByteBufferHelper.getInt(nullsBuffer)
--- End diff --

good catch, done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138838698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
 ---
@@ -149,4 +153,23 @@ private[columnar] object ColumnAccessor {
 throw new Exception(s"not support type: $other")
 }
   }
+
+  def decompress(columnAccessor: ColumnAccessor, columnVector: 
WritableColumnVector, numRows: Int):
+  Unit = {
+if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
+  val nativeAccessor = 
columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
+  nativeAccessor.decompress(columnVector, numRows)
+} else {
+  val dataBuffer = 
columnAccessor.asInstanceOf[BasicColumnAccessor[_]].getByteBuffer
+  val nullsBuffer = 
dataBuffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+
+  val numNulls = ByteBufferHelper.getInt(nullsBuffer)
+  for (i <- 0 until numNulls) {
+val cordinal = ByteBufferHelper.getInt(nullsBuffer)
+columnVector.putNull(cordinal)
+  }
+  throw new RuntimeException("Not support non-primitive type now")
--- End diff --

thanks, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138409856
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 ---
@@ -147,6 +147,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
--- End diff --

@ueshin Line 145 may make a mistake in comment `Sets values from [rowId, 
rowId + count) to [src + srcIndex, src + srcIndex + count)`
It should be `Sets values from [src + srcIndex, src + srcIndex + count) to 
[rowId, rowId + count)`
What do you think?

If so, should we update them in this PR? Or, is it better to create another 
PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138409265
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 ---
@@ -147,6 +147,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
--- End diff --

I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138364852
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
 ---
@@ -149,4 +153,23 @@ private[columnar] object ColumnAccessor {
 throw new Exception(s"not support type: $other")
 }
   }
+
+  def decompress(columnAccessor: ColumnAccessor, columnVector: 
WritableColumnVector, numRows: Int):
+  Unit = {
+if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
+  val nativeAccessor = 
columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
+  nativeAccessor.decompress(columnVector, numRows)
+} else {
+  val dataBuffer = 
columnAccessor.asInstanceOf[BasicColumnAccessor[_]].getByteBuffer
+  val nullsBuffer = 
dataBuffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+
+  val numNulls = ByteBufferHelper.getInt(nullsBuffer)
+  for (i <- 0 until numNulls) {
+val cordinal = ByteBufferHelper.getInt(nullsBuffer)
--- End diff --

typo? `ordinal`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138363787
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 ---
@@ -147,6 +147,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
--- End diff --

This description is a little vague, as the input data is `byte[]`. Can we 
say more about this? e.g. endianness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138366156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -61,6 +63,162 @@ private[columnar] case object PassThrough extends 
CompressionScheme {
 }
 
 override def hasNext: Boolean = buffer.hasRemaining
+
+override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else capacity
+  var pos = 0
+  var seenNulls = 0
+  val srcArray = buffer.array
+  var bufferPos = buffer.position
+  columnType.dataType match {
+case _: BooleanType =>
+  val unitSize = 1
+  while (pos < capacity) {
+if (pos != nextNullIndex) {
+  val len = nextNullIndex - pos
+  assert(len * unitSize < Int.MaxValue)
+  for (i <- 0 until len) {
+val value = buffer.get(bufferPos + i) != 0
+columnVector.putBoolean(pos + i, value)
+  }
+  bufferPos += len
+  pos += len
+} else {
+  seenNulls += 1
+  nextNullIndex = if (seenNulls < nullCount) {
+ByteBufferHelper.getInt(nullsBuffer)
+  } else {
+capacity
+  }
+  columnVector.putNull(pos)
+  pos += 1
+}
+  }
+case _: ByteType =>
--- End diff --

hmmm, is there any way to reduce the code duplication? maybe codegen?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138365192
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
 ---
@@ -149,4 +153,23 @@ private[columnar] object ColumnAccessor {
 throw new Exception(s"not support type: $other")
 }
   }
+
+  def decompress(columnAccessor: ColumnAccessor, columnVector: 
WritableColumnVector, numRows: Int):
+  Unit = {
+if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
+  val nativeAccessor = 
columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
+  nativeAccessor.decompress(columnVector, numRows)
+} else {
+  val dataBuffer = 
columnAccessor.asInstanceOf[BasicColumnAccessor[_]].getByteBuffer
+  val nullsBuffer = 
dataBuffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+
+  val numNulls = ByteBufferHelper.getInt(nullsBuffer)
+  for (i <- 0 until numNulls) {
+val cordinal = ByteBufferHelper.getInt(nullsBuffer)
+columnVector.putNull(cordinal)
+  }
+  throw new RuntimeException("Not support non-primitive type now")
--- End diff --

If we need to throw exception at last, why not do it at the beginning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r138363222
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public final class ColumnDictionary implements Dictionary {
+  private Object[] dictionary;
+
+  public ColumnDictionary(Object[] dictionary) {
+this.dictionary = dictionary;
+  }
+
+  @Override
+  public int decodeToInt(int id) {
+return (Integer)dictionary[id];
--- End diff --

is it possible to avoid boxing here? e.g. we can have a lot of primitive 
array members.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135066290
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -278,6 +555,46 @@ private[columnar] case object DictionaryEncoding 
extends CompressionScheme {
 }
 
 override def hasNext: Boolean = buffer.hasRemaining
+
+override def decompress(columnVector: ColumnVector, capacity: Int): 
Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+  var pos = 0
+  var seenNulls = 0
+  columnType.dataType match {
+case _: IntegerType =>
+  while (pos < capacity) {
+if (pos != nextNullIndex) {
+  val value = dictionary(buffer.getShort()).asInstanceOf[Int]
+  columnVector.putInt(pos, value)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135066268
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -433,6 +433,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
+   */
+  public abstract void putShorts(int rowId, int count, byte[] src, int 
srcIndex);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135056371
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -433,6 +433,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
+   */
+  public abstract void putShorts(int rowId, int count, byte[] src, int 
srcIndex);
--- End diff --

Got it. Rebased in my local version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135056247
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -278,6 +555,46 @@ private[columnar] case object DictionaryEncoding 
extends CompressionScheme {
 }
 
 override def hasNext: Boolean = buffer.hasRemaining
+
+override def decompress(columnVector: ColumnVector, capacity: Int): 
Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+  var pos = 0
+  var seenNulls = 0
+  columnType.dataType match {
+case _: IntegerType =>
+  while (pos < capacity) {
+if (pos != nextNullIndex) {
+  val value = dictionary(buffer.getShort()).asInstanceOf[Int]
+  columnVector.putInt(pos, value)
--- End diff --

Sure, I will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135043913
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -278,6 +555,46 @@ private[columnar] case object DictionaryEncoding 
extends CompressionScheme {
 }
 
 override def hasNext: Boolean = buffer.hasRemaining
+
+override def decompress(columnVector: ColumnVector, capacity: Int): 
Unit = {
+  val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
+  nullsBuffer.rewind()
+  val nullCount = ByteBufferHelper.getInt(nullsBuffer)
+  var nextNullIndex = if (nullCount > 0) 
ByteBufferHelper.getInt(nullsBuffer) else -1
+  var pos = 0
+  var seenNulls = 0
+  columnType.dataType match {
+case _: IntegerType =>
+  while (pos < capacity) {
+if (pos != nextNullIndex) {
+  val value = dictionary(buffer.getShort()).asInstanceOf[Int]
+  columnVector.putInt(pos, value)
--- End diff --

can we delay the decompression and set the dictionary to `ColumnVector`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...

2017-08-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18704#discussion_r135042826
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -433,6 +433,11 @@ private void throwUnsupportedException(int 
requiredCapacity, Throwable cause) {
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
   /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], 
src[srcIndex + count])
+   */
+  public abstract void putShorts(int rowId, int count, byte[] src, int 
srcIndex);
--- End diff --

now we can move them in `WritableColumnVector`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org