[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134780644
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -505,18 +511,12 @@ public void filterNullsInColumn(int ordinal) {
 nullFilteredColumns.add(ordinal);
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, MemoryMode 
memMode) {
+  public ColumnarBatch(StructType schema, ColumnVector[] columns, int 
capacity) {
--- End diff --

we can add a new constructor that accepts `WritableColumnVector[]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134780277
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -95,19 +81,32 @@ public void close() {
 private final ColumnarBatch parent;
 private final int fixedLenRowSize;
 private final ColumnVector[] columns;
+private final WritableColumnVector[] writableColumns;
--- End diff --

can we move this to `ColumnarBatch` instead of `Row`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134779475
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +306,69 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  getColumnAsWritable(ordinal).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putBoolean(rowId, value);
 }
 
 @Override
 public void setByte(int ordinal, byte value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putByte(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putByte(rowId, value);
 }
 
 @Override
 public void setShort(int ordinal, short value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putShort(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putShort(rowId, value);
 }
 
 @Override
 public void setInt(int ordinal, int value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putInt(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putInt(rowId, value);
 }
 
 @Override
 public void setLong(int ordinal, long value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putLong(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putLong(rowId, value);
 }
 
 @Override
 public void setFloat(int ordinal, float value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putFloat(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putFloat(rowId, value);
 }
 
 @Override
 public void setDouble(int ordinal, double value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putDouble(rowId, value);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putDouble(rowId, value);
 }
 
 @Override
 public void setDecimal(int ordinal, Decimal value, int precision) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putDecimal(rowId, value, precision);
+  WritableColumnVector column = getColumnAsWritable(ordinal);
+  column.putNotNull(rowId);
+  column.putDecimal(rowId, value, precision);
+}
+
+private WritableColumnVector getColumnAsWritable(int ordinal) {
--- End diff --

nit: `getWritableColumn`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134399222
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,69 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  getColumnAsMutable(ordinal).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
+  column.putNotNull(rowId);
+  column.putBoolean(rowId, value);
 }
 
 @Override
 public void setByte(int ordinal, byte value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putByte(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
--- End diff --

@ueshin @cloud-fan 
Since `MutableColumnVector` in each column in `ColumnarBatch` is immutable, 
we can an array of `MutableColumnVector` by applying cast at initialization. If 
an cast exception occurs, we can ignore it since the column will not call 
setter APIs. Then, each setter in refers to an element of the array.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134396135
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,70 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
--- End diff --

oh, then we really need to think about how to eliminate the per-call type 
cast...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-22 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134393145
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,70 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
--- End diff --

It seems like the rows returned by `ColumnarBatch.rowIterator` doesn't need 
to be mutable with our current tests, but `ColumnarBatch.Row` still needs to be 
mutable, the write apis of which are used in `HashAggregateExec`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134232538
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -970,30 +458,14 @@ public final int appendStruct(boolean isNull) {
   protected boolean anyNullsSet;
 
   /**
-   * True if this column's values are fixed. This means the column values 
never change, even
-   * across resets.
-   */
-  protected boolean isConstant;
-
-  /**
-   * Default size of each array length value. This grows as necessary.
-   */
-  protected static final int DEFAULT_ARRAY_LENGTH = 4;
-
-  /**
-   * Current write cursor (row index) when appending data.
-   */
-  protected int elementsAppended;
-
-  /**
* If this is a nested type (array or struct), the column for the child 
data.
*/
   protected ColumnVector[] childColumns;
--- End diff --

yea, because mostly the child columns are of the same type of concrete 
column vector type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134208734
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their 
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed 
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is 
enough room before adding
+ * elements. This means that the put() APIs do not check as in common 
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. 
In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class WritableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no 
longer accessible.
+   */
+  public void reset() {
+if (isConstant) return;
+
+if (childColumns != null) {
+  for (ColumnVector c: childColumns) {
+((WritableColumnVector) c).reset();
+  }
+}
+numNulls = 0;
+elementsAppended = 0;
+if (anyNullsSet) {
+  putNotNulls(0, capacity);
+  anyNullsSet = false;
+}
+  }
+
+  public void reserve(int requiredCapacity) {
+if (requiredCapacity > capacity) {
+  int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 
2L);
+  if (requiredCapacity <= newCapacity) {
+try {
+  reserveInternal(newCapacity);
+} catch (OutOfMemoryError outOfMemoryError) {
+  throwUnsupportedException(requiredCapacity, outOfMemoryError);
+}
+  } else {
+throwUnsupportedException(requiredCapacity, null);
+  }
+}
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
+String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
+"(requested = " + requiredCapacity + " bytes). As a workaround, 
you can disable the " +
+"vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+" to false.";
+throw new RuntimeException(message, cause);
+  }
+
+  /**
+   * Ensures that there is enough storage to store capacity elements. That 
is, the put() APIs
+   * must work for all rowIds < capacity.
+   */
+  protected abstract void reserveInternal(int capacity);
+
+  /**
+   * Sets the value at rowId to null/not null.
+   */
+  public abstract void putNotNull(int rowId);
+  public abstract void putNull(int rowId);
+
+  /**
+   * Sets the values from [rowId, rowId + count) to null/not null.
+   */
+  public abstract void putNulls(int rowId, int count);
+  public abstract void putNotNulls(int rowId, int count);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putBoolean(int rowId, boolean value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBooleans(int rowId, int count, boolean value);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putByte(int rowId, byte value);
+
+  

[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134148287
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -970,30 +458,14 @@ public final int appendStruct(boolean isNull) {
   protected boolean anyNullsSet;
 
   /**
-   * True if this column's values are fixed. This means the column values 
never change, even
-   * across resets.
-   */
-  protected boolean isConstant;
-
-  /**
-   * Default size of each array length value. This grows as necessary.
-   */
-  protected static final int DEFAULT_ARRAY_LENGTH = 4;
-
-  /**
-   * Current write cursor (row index) when appending data.
-   */
-  protected int elementsAppended;
-
-  /**
* If this is a nested type (array or struct), the column for the child 
data.
*/
   protected ColumnVector[] childColumns;
--- End diff --

We need this field for `ArrowColumnVector` to store its child columns, too.
Do you want to make the method `getChildColumn(int ordinal)` abstract and 
move the field to more concrete classes to manage by themselves?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134147099
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,70 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
--- End diff --

one question, does the rows returned by `ColumnarBatch.rowIterator` have to 
be mutable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134146811
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -970,30 +458,14 @@ public final int appendStruct(boolean isNull) {
   protected boolean anyNullsSet;
 
   /**
-   * True if this column's values are fixed. This means the column values 
never change, even
-   * across resets.
-   */
-  protected boolean isConstant;
-
-  /**
-   * Default size of each array length value. This grows as necessary.
-   */
-  protected static final int DEFAULT_ARRAY_LENGTH = 4;
-
-  /**
-   * Current write cursor (row index) when appending data.
-   */
-  protected int elementsAppended;
-
-  /**
* If this is a nested type (array or struct), the column for the child 
data.
*/
   protected ColumnVector[] childColumns;
--- End diff --

can we move this to `WritableColumnVector`? I think `ColumnVector` only 
need `ColumnVector getChildColumn(int ordinal)`, and `WritableColumnVector` can 
overwrite it to `WritableColumnVector getChildColumn(int ordinal)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134145753
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,69 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  getColumnAsMutable(ordinal).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
+  column.putNotNull(rowId);
+  column.putBoolean(rowId, value);
 }
 
 @Override
 public void setByte(int ordinal, byte value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putByte(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
--- End diff --

In my understanding, cast still occurs at runtime. The cast operation may 
consist compare and branch.
I am thinking about how we can reduce the cost of operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134115047
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,69 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  getColumnAsMutable(ordinal).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
+  column.putNotNull(rowId);
+  column.putBoolean(rowId, value);
 }
 
 @Override
 public void setByte(int ordinal, byte value) {
-  assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putByte(rowId, value);
+  MutableColumnVector column = getColumnAsMutable(ordinal);
--- End diff --

I'm a little afraid about this per-call type cast, but JVM should be able 
to optimize it perfectly, cc @kiszk 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114861
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnVector.java
 ---
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their 
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed 
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is 
enough room before adding
+ * elements. This means that the put() APIs do not check as in common 
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. 
In other words, it is not
--- End diff --

How about `WritableColumnVector`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114822
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,69 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
--- End diff --

just for curiosity, will we really update the rows returned by 
`ColumnarBatch.rowIterator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114762
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -1049,43 +511,9 @@ public ColumnVector getDictionaryIds() {
* Sets up the common state and also handles creating the child columns 
if this is a nested
* type.
*/
-  protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
+  protected ColumnVector(int capacity, DataType type) {
 this.capacity = capacity;
 this.type = type;
-
-if (type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType
-|| DecimalType.isByteArrayDecimalType(type)) {
-  DataType childType;
-  int childCapacity = capacity;
-  if (type instanceof ArrayType) {
-childType = ((ArrayType)type).elementType();
-  } else {
-childType = DataTypes.ByteType;
-childCapacity *= DEFAULT_ARRAY_LENGTH;
-  }
-  this.childColumns = new ColumnVector[1];
-  this.childColumns[0] = ColumnVector.allocate(childCapacity, 
childType, memMode);
-  this.resultArray = new Array(this.childColumns[0]);
-  this.resultStruct = null;
-} else if (type instanceof StructType) {
-  StructType st = (StructType)type;
-  this.childColumns = new ColumnVector[st.fields().length];
-  for (int i = 0; i < childColumns.length; ++i) {
-this.childColumns[i] = ColumnVector.allocate(capacity, 
st.fields()[i].dataType(), memMode);
-  }
-  this.resultArray = null;
-  this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-} else if (type instanceof CalendarIntervalType) {
-  // Two columns. Months as int. Microseconds as Long.
-  this.childColumns = new ColumnVector[2];
-  this.childColumns[0] = ColumnVector.allocate(capacity, 
DataTypes.IntegerType, memMode);
-  this.childColumns[1] = ColumnVector.allocate(capacity, 
DataTypes.LongType, memMode);
-  this.resultArray = null;
-  this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-} else {
-  this.childColumns = null;
-  this.resultArray = null;
-  this.resultStruct = null;
-}
+this.isConstant = true;
--- End diff --

I think `isConstant` should belong to `MutableColumnVector`, because it's 
used to indicate that this column vector should not be updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114641
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -281,62 +255,14 @@ public Object get(int ordinal, DataType dataType) {
   /**
* Resets this column for writing. The currently stored values are no 
longer accessible.
*/
-  public void reset() {
-if (isConstant) return;
-
-if (childColumns != null) {
-  for (ColumnVector c: childColumns) {
-c.reset();
-  }
-}
-numNulls = 0;
-elementsAppended = 0;
-if (anyNullsSet) {
-  putNotNulls(0, capacity);
-  anyNullsSet = false;
-}
-  }
+  public abstract void reset();
--- End diff --

if `ColumnVector` is read-only, why we need a `reset` API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114520
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -172,20 +177,26 @@ public void initBatch(MemoryMode memMode, StructType 
partitionColumns,
   }
 }
 
-columnarBatch = ColumnarBatch.allocate(batchSchema, memMode);
+int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
+if (memMode == MemoryMode.OFF_HEAP) {
+  columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
batchSchema);
+} else {
+  columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
batchSchema);
--- End diff --

seems we can have a `MutableColumnVector.allocateColumns(memMode)` for this 
logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114491
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 ---
@@ -433,7 +434,8 @@ private void readBinaryBatch(int rowId, int num, 
ColumnVector column) throws IOE
   }
 
   private void readFixedLenByteArrayBatch(int rowId, int num,
-  ColumnVector column, int 
arrayLen) throws IOException {
+  MutableColumnVector column,
+  int arrayLen) throws IOException 
{
--- End diff --

nit:
```
private void xxx(
para1: XX,
para2: XX)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r134114461
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -450,14 +450,13 @@ class CodegenContext {
   /**
* Returns the specialized code to set a given value in a column vector 
for a given `DataType`.
*/
-  def setValue(batch: String, row: String, dataType: DataType, ordinal: 
Int,
-  value: String): String = {
+  def setValue(vector: String, row: String, dataType: DataType, value: 
String): String = {
--- End diff --

nit: `row` -> `rowId`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133626997
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -505,18 +500,12 @@ public void filterNullsInColumn(int ordinal) {
 nullFilteredColumns.add(ordinal);
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, MemoryMode 
memMode) {
+  public ColumnarBatch(StructType schema, ColumnVector[] columns, int 
capacity) {
 this.schema = schema;
-this.capacity = maxRows;
-this.columns = new ColumnVector[schema.size()];
+this.columns = columns;
+this.capacity = capacity;
--- End diff --

I found some places referring `ColumnarBatch.capacity()`, so I'd be a 
little conservative to do that for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133622586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -89,14 +91,23 @@ class VectorizedHashMapGenerator(
|$generatedAggBufferSchema
|
|  public $generatedClassName() {
-   |batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |// TODO: Possibly generate this projection in HashAggregate 
directly
-   |aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-   |  aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-   |   aggregateBufferBatch.setColumn(i, 
batch.column(i+${groupingKeys.length}));
+   |batchVectors = new org.apache.spark.sql.execution.vectorized
+   |  .OnHeapColumnVector[schema.fields().length];
+   |for (int i = 0; i < schema.fields().length; i++) {
+   |  batchVectors[i] = new 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector(
+   |capacity, schema.fields()[i].dataType());
+   |}
+   |batch = new 
org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+   |  schema, batchVectors, capacity);
+   |
+   |bufferVectors = new org.apache.spark.sql.execution.vectorized
+   |  .OnHeapColumnVector[aggregateBufferSchema.fields().length];
+   |for (int i = 0; i < aggregateBufferSchema.fields().length; 
i++) {
+   |  bufferVectors[i] = batchVectors[i + ${groupingKeys.length}];
|}
+   |// TODO: Possibly generate this projection in HashAggregate 
directly
--- End diff --

I'm sorry but I'm not sure because this is from original code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133622350
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -89,14 +91,23 @@ class VectorizedHashMapGenerator(
|$generatedAggBufferSchema
|
|  public $generatedClassName() {
-   |batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |// TODO: Possibly generate this projection in HashAggregate 
directly
-   |aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-   |  aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-   |   aggregateBufferBatch.setColumn(i, 
batch.column(i+${groupingKeys.length}));
+   |batchVectors = new org.apache.spark.sql.execution.vectorized
--- End diff --

Sure, I'll try it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133618935
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 ---
@@ -40,8 +39,43 @@
   private long lengthData;
   private long offsetData;
 
-  protected OffHeapColumnVector(int capacity, DataType type) {
-super(capacity, type, MemoryMode.OFF_HEAP);
+  public OffHeapColumnVector(int capacity, DataType type) {
+super(capacity, type);
+
+if (type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType
--- End diff --

Sure, I'll try it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133618947
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 ---
@@ -491,6 +525,22 @@ public void loadBytes(ColumnVector.Array array) {
 array.byteArrayOffset = 0;
   }
 
+  /**
+   * Reserve a integer column for ids of dictionary.
+   */
+  @Override
+  public OffHeapColumnVector reserveDictionaryIds(int capacity) {
--- End diff --

Sure, I'll try it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133617360
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,73 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
+  assert (columns[ordinal] instanceof MutableColumnVector);
   assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  ((MutableColumnVector) columns[ordinal]).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
+  assert (columns[ordinal] instanceof MutableColumnVector);
   assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  ((MutableColumnVector) columns[ordinal]).putNotNull(rowId);
--- End diff --

Sure, I'll add a private getter and update these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133617361
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnVector.java
 ---
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their 
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed 
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is 
enough room before adding
+ * elements. This means that the put() APIs do not check as in common 
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. 
In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class MutableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no 
longer accessible.
+   */
+  @Override
+  public void reset() {
+if (isConstant) return;
+
+if (childColumns != null) {
+  for (ColumnVector c: childColumns) {
+c.reset();
+  }
+}
+numNulls = 0;
+elementsAppended = 0;
+if (anyNullsSet) {
+  putNotNulls(0, capacity);
+  anyNullsSet = false;
+}
+  }
+
+  public void reserve(int requiredCapacity) {
+if (requiredCapacity > capacity) {
+  int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 
2L);
+  if (requiredCapacity <= newCapacity) {
+try {
+  reserveInternal(newCapacity);
+} catch (OutOfMemoryError outOfMemoryError) {
+  throwUnsupportedException(requiredCapacity, outOfMemoryError);
+}
+  } else {
+throwUnsupportedException(requiredCapacity, null);
+  }
+}
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
+String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
+"(requested = " + requiredCapacity + " bytes). As a workaround, 
you can disable the " +
+"vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+" to false.";
+
+if (cause != null) {
+  throw new RuntimeException(message, cause);
--- End diff --

Thanks. I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133579534
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -505,18 +500,12 @@ public void filterNullsInColumn(int ordinal) {
 nullFilteredColumns.add(ordinal);
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, MemoryMode 
memMode) {
+  public ColumnarBatch(StructType schema, ColumnVector[] columns, int 
capacity) {
 this.schema = schema;
-this.capacity = maxRows;
-this.columns = new ColumnVector[schema.size()];
+this.columns = columns;
+this.capacity = capacity;
--- End diff --

Does `capacity` really mean anything in here anymore since the 
`ColumnVectors` are allocated and populated outside now?  Could we just 
initialize `this.numRows = 0` and delay initializing of `this.filteredRows` 
until `setNumRows()` is called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133434818
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnVector.java
 ---
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their 
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed 
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is 
enough room before adding
+ * elements. This means that the put() APIs do not check as in common 
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. 
In other words, it is not
--- End diff --

This contradicts the name of this class. Maybe reuseable is a better way of 
describing what is going on here. Also cc @michal-databricks 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133421918
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -89,14 +91,23 @@ class VectorizedHashMapGenerator(
|$generatedAggBufferSchema
|
|  public $generatedClassName() {
-   |batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |// TODO: Possibly generate this projection in HashAggregate 
directly
-   |aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-   |  aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-   |   aggregateBufferBatch.setColumn(i, 
batch.column(i+${groupingKeys.length}));
+   |batchVectors = new org.apache.spark.sql.execution.vectorized
+   |  .OnHeapColumnVector[schema.fields().length];
+   |for (int i = 0; i < schema.fields().length; i++) {
+   |  batchVectors[i] = new 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector(
+   |capacity, schema.fields()[i].dataType());
+   |}
+   |batch = new 
org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+   |  schema, batchVectors, capacity);
+   |
+   |bufferVectors = new org.apache.spark.sql.execution.vectorized
+   |  .OnHeapColumnVector[aggregateBufferSchema.fields().length];
+   |for (int i = 0; i < aggregateBufferSchema.fields().length; 
i++) {
+   |  bufferVectors[i] = batchVectors[i + ${groupingKeys.length}];
|}
+   |// TODO: Possibly generate this projection in HashAggregate 
directly
--- End diff --

Can you elaborate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133421832
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -89,14 +91,23 @@ class VectorizedHashMapGenerator(
|$generatedAggBufferSchema
|
|  public $generatedClassName() {
-   |batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |// TODO: Possibly generate this projection in HashAggregate 
directly
-   |aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-   |  aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-   |for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-   |   aggregateBufferBatch.setColumn(i, 
batch.column(i+${groupingKeys.length}));
+   |batchVectors = new org.apache.spark.sql.execution.vectorized
--- End diff --

This happens quite a few times. It might be better to create a static util 
method that creates the vectors for you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133420801
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 ---
@@ -491,6 +525,22 @@ public void loadBytes(ColumnVector.Array array) {
 array.byteArrayOffset = 0;
   }
 
+  /**
+   * Reserve a integer column for ids of dictionary.
+   */
+  @Override
+  public OffHeapColumnVector reserveDictionaryIds(int capacity) {
--- End diff --

Same comment as in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133420728
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 ---
@@ -40,8 +39,43 @@
   private long lengthData;
   private long offsetData;
 
-  protected OffHeapColumnVector(int capacity, DataType type) {
-super(capacity, type, MemoryMode.OFF_HEAP);
+  public OffHeapColumnVector(int capacity, DataType type) {
+super(capacity, type);
+
+if (type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType
--- End diff --

Can you try to move this initialization logic into the parent class? We 
should be able to factor out the on/off-heap specific initialization logic into 
a separate method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133419451
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnVector.java
 ---
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their 
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed 
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is 
enough room before adding
+ * elements. This means that the put() APIs do not check as in common 
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. 
In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class MutableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no 
longer accessible.
+   */
+  @Override
+  public void reset() {
+if (isConstant) return;
+
+if (childColumns != null) {
+  for (ColumnVector c: childColumns) {
+c.reset();
+  }
+}
+numNulls = 0;
+elementsAppended = 0;
+if (anyNullsSet) {
+  putNotNulls(0, capacity);
+  anyNullsSet = false;
+}
+  }
+
+  public void reserve(int requiredCapacity) {
+if (requiredCapacity > capacity) {
+  int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 
2L);
+  if (requiredCapacity <= newCapacity) {
+try {
+  reserveInternal(newCapacity);
+} catch (OutOfMemoryError outOfMemoryError) {
+  throwUnsupportedException(requiredCapacity, outOfMemoryError);
+}
+  } else {
+throwUnsupportedException(requiredCapacity, null);
+  }
+}
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
+String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
+"(requested = " + requiredCapacity + " bytes). As a workaround, 
you can disable the " +
+"vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+" to false.";
+
+if (cause != null) {
+  throw new RuntimeException(message, cause);
--- End diff --

You are allowed to pass `null` as a cause to the `RuntimeException` 
constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/18958#discussion_r133418831
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -307,64 +293,73 @@ public void update(int ordinal, Object value) {
 
 @Override
 public void setNullAt(int ordinal) {
+  assert (columns[ordinal] instanceof MutableColumnVector);
   assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNull(rowId);
+  ((MutableColumnVector) columns[ordinal]).putNull(rowId);
 }
 
 @Override
 public void setBoolean(int ordinal, boolean value) {
+  assert (columns[ordinal] instanceof MutableColumnVector);
   assert (!columns[ordinal].isConstant);
-  columns[ordinal].putNotNull(rowId);
-  columns[ordinal].putBoolean(rowId, value);
+  ((MutableColumnVector) columns[ordinal]).putNotNull(rowId);
--- End diff --

Maybe move the assertion and the cast in a private getter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18958: [SPARK-21745][SQL] Refactor ColumnVector hierarch...

2017-08-16 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/18958

[SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector 
read-only and to introduce MutableColumnVector.

## What changes were proposed in this pull request?

This is a refactoring of `ColumnVector` hierarchy and related classes.

1. make `ColumnVector` read-only
2. introduce `MutableColumnVector` with write interface
3. remove `ReadOnlyColumnVector`

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-21745

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18958


commit e4e22412c5ab23766a6908ec9e1a7931bcd52a54
Author: Takuya UESHIN 
Date:   2017-08-15T04:09:16Z

Refactor ColumnVector hierarchy to make ColumnVector read-only and to 
introduce MutableColumnVector.

commit cd0de397bba202cd5173e8aee0fc0bec2615295c
Author: Takuya UESHIN 
Date:   2017-08-15T04:38:32Z

Modify VectorizedHashMapGenerator to use OnHeapColumnVector directly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org