[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2017-10-28 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223835#comment-16223835
 ] 

Wenchen Fan commented on SPARK-18394:
-

resolved by https://github.com/apache/spark/pull/18959

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>Assignee: Takeshi Yamamuro
> Fix For: 2.3.0
>
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2017-08-15 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128243#comment-16128243
 ] 

Takeshi Yamamuro commented on SPARK-18394:
--

Any update? I checked and I found the master still has this issue; I just run 
the query above and dump output names in 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L102.
{code}
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_QUANTITY#9015,L_RETURNFLAG#9019,l_shipdate#9021,L_TAX#9018,L_DISCOUNT#9017,L_LINESTATUS#9020,L_EXTENDEDPRICE#9016
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_RETURNFLAG#9142,L_DISCOUNT#9140,L_EXTENDEDPRICE#9139,L_QUANTITY#9138,L_LINESTATUS#9143,l_shipdate#9144,L_TAX#9141
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_QUANTITY#9305,L_TAX#9308,l_shipdate#9311,L_DISCOUNT#9307,L_RETURNFLAG#9309,L_LINESTATUS#9310,L_EXTENDEDPRICE#9306
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_EXTENDEDPRICE#9451,L_QUANTITY#9450,L_RETURNFLAG#9454,L_TAX#9453,L_DISCOUNT#9452,l_shipdate#9456,L_LINESTATUS#9455
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_LINESTATUS#9600,l_shipdate#9601,L_DISCOUNT#9597,L_TAX#9598,L_EXTENDEDPRICE#9596,L_RETURNFLAG#9599,L_QUANTITY#9595
17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: 
L_QUANTITY#9740,L_TAX#9743,l_shipdate#9746,L_DISCOUNT#9742,L_EXTENDEDPRICE#9741,L_LINESTATUS#9745,L_RETURNFLAG#9744
...
{code}
The attribute order is different, and then Spark generates different  code in 
`GenerateColumnAccessor`.
Also, I quickly checked `AttributeSet.toSeq` output attributes with a different 
order;
{code}
scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098))
scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107))
scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(838))
scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(389))
scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(89329))
scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
scala> (attrSetA ++ attrSetB).toSeq
res1: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = 
WrappedArray(c3#838, c4#389, c2#107, c5#89329, c1#1098)

scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(392))
scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(92))
scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(87))
scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = 
ExprId(9023920))
scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(522))
scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
scala> (attrSetA ++ attrSetB).toSeq
res2: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = 
WrappedArray(c3#87, c1#392, c5#522, c2#92, c4#9023920)
{code}

As suggested, to fix this, `Attribute.toSeq` need to output attributes with a 
consistent order like;
https://github.com/apache/spark/compare/master...maropu:SPARK-18394
 

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688345#comment-15688345
 ] 

Herman van Hovell commented on SPARK-18394:
---

Great! Ping me if you need any assistance.

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new 
> 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Jonny Serencsa (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688333#comment-15688333
 ] 

Jonny Serencsa commented on SPARK-18394:


Yes I would. 

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
> accessor4 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
> accessor5 = new 
> 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688330#comment-15688330
 ] 

Herman van Hovell commented on SPARK-18394:
---

Nice, this is a good find.

I think we either need to make AttributeSet iteration deterministic, or make a 
change to the following line in the SparkPlanner: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L96

Would you be interested in working on this? 



> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Jonny Serencsa (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688244#comment-15688244
 ] 

Jonny Serencsa commented on SPARK-18394:


Already did that. The problem happens with both HiveScanExec and 
InMemoryScanExec. The indeterminate ordering is an artifact of the hash code 
for AttributeEquals  involving a hash code of the exprId. Thus, when you 
iterate through an AttributeSet, the order or the attributes is not consistent. 

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
> accessor3 = new 
> 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688225#comment-15688225
 ] 

Herman van Hovell commented on SPARK-18394:
---

Ok, that is fair.

What strikes me as odd is that the column order that the columnar cache 
produces is different the two both plans. This is what causes the code 
generator to create two different 'programs' and what in the end causes the 
your caching problems . Could you re-run this without the in-memory cache, and 
see if you are still hitting this problem.

I'll have a look on my end to see what is going on in the in-memory cache.

> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>Priority: Minor
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = batch.buffers()[columnIndexes[i]];
> }
> accessor = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
> accessor1 = new 
> org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
> accessor2 = new 
> 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Jonny Serencsa (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688135#comment-15688135
 ] 

Jonny Serencsa commented on SPARK-18394:


This problem was discovered during high concurrency experiments where I was 
running the aforementioned query thousands of times repeatedly via many 
concurrent clients. Eventually, after 5K-10K executions, the JVM level 
CodeGenCache was having to be purged resulting in 10 second long pauses.

My expectation was that since the exact same query is being executed, Spark 
would not have to re-generate the byte code (because of it's own CodeGenCache). 
After removing the WHERE clauses from the query, this was in fact the case and 
the JVM level cache purging disappeared.  

I made this a Major issue because it didn't seem like Spark's CodeGenCache was 
working as expected. Perhaps I am mistaken. 

My above repro of the issue required me to set breakpoints through the 
debugger. 



> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>Priority: Minor
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor1;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor2;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor3;
> private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor 
> accessor4;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor5;
> private org.apache.spark.sql.execution.columnar.StringColumnAccessor 
> accessor6;
> public SpecificColumnarIterator() {
> this.nativeOrder = ByteOrder.nativeOrder();
> this.buffers = new byte[7][];
> this.mutableRow = new MutableUnsafeRow(rowWriter);
> }
> public void initialize(Iterator input, DataType[] columnTypes, int[] 
> columnIndexes) {
> this.input = input;
> this.columnTypes = columnTypes;
> this.columnIndexes = columnIndexes;
> }
> public boolean hasNext() {
> if (currentRow < numRowsInBatch) {
> return true;
> }
> if (!input.hasNext()) {
> return false;
> }
> org.apache.spark.sql.execution.columnar.CachedBatch batch = 
> (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
> currentRow = 0;
> numRowsInBatch = batch.numRows();
> for (int i = 0; i < columnIndexes.length; i ++) {
> buffers[i] = 

[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses

2016-11-22 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688059#comment-15688059
 ] 

Herman van Hovell commented on SPARK-18394:
---

I am not able to reproduce this. Could you also explain to me why this is a 
major issue?

I have used the following script:
{noformat}
sc.setLogLevel("INFO")
spark.sql("create database if not exists tpc")
spark.sql("drop table if exists tpc.lineitem")
spark.sql("""
create table tpc.lineitem (
  L_ORDERKEY bigint,
  L_PARTKEY bigint,
  L_SUPPKEY bigint,
  L_LINENUMBER bigint,
  L_QUANTITY double,
  L_EXTENDEDPRICE double,
  L_DISCOUNT double,
  L_TAX double,
  L_RETURNFLAG string,
  L_LINESTATUS string,
  L_SHIPDATE string,
  L_COMMITDATE string,
  L_RECEIPTDATE string,
  L_SHIPINSTRUCT string,
  L_SHIPMODE string,
  L_COMMENT string
) using parquet
""")

spark.sql(s"""
insert into tpc.lineitem
select id as L_ORDERKEY,
   id % 10 as L_PARTKEY,
   id % 50 as L_SUPPKEY,
   id as L_LINENUMBER,
   rand(3) * 10 as L_QUANTITY,
   rand(5) * 50 as L_EXTENDEDPRICE,
   rand(7) * 20 as L_DISCOUNT,
   0.18d as L_TAX,
   case when rand(11) < 0.7d then 'Y' else 'N' end as L_RETURNFLAG,
   case when rand(13) < 0.4d then 'A' when rand(17) < 0.2d then 'B' else 
'C' end as L_LINESTATUS,
   date_format(date_add(date '1998-08-05', id % 365), '-MM-dd') as 
L_SHIPDATE,
   date_format(date_add(date '1998-08-01', id % 365), '-MM-dd') as 
L_COMMITDATE,
   date_format(date_add(date '1998-08-03', id % 365), '-MM-dd') as 
L_RECEIPTDATE,
   'DUMMY' as L_SHIPINSTRUCT,
   case when rand(19) < 0.7d then 'AIR' else 'LAND' end as L_SHIPMODE,
   'DUMMY' as L_COMMENT
from   range(100)
""")


val df = spark.sql("""
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
tpc.lineitem
where
l_shipdate <= date_sub('1998-12-01', '90')
group by
l_returnflag,
l_linestatus
""")

df.show()

df.show()
{noformat}



> Executing the same query twice in a row results in CodeGenerator cache misses
> -
>
> Key: SPARK-18394
> URL: https://issues.apache.org/jira/browse/SPARK-18394
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
> Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop
>Reporter: Jonny Serencsa
>
> Executing the query:
> {noformat}
> select
> l_returnflag,
> l_linestatus,
> sum(l_quantity) as sum_qty,
> sum(l_extendedprice) as sum_base_price,
> sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
> sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
> avg(l_quantity) as avg_qty,
> avg(l_extendedprice) as avg_price,
> avg(l_discount) as avg_disc,
> count(*) as count_order
> from
> lineitem_1_row
> where
> l_shipdate <= date_sub('1998-12-01', '90')
> group by
> l_returnflag,
> l_linestatus
> ;
> {noformat}
> twice (in succession), will result in CodeGenerator cache misses in BOTH 
> executions. Since the query is identical, I would expect the same code to be 
> generated. 
> Turns out, the generated code is not exactly the same, resulting in cache 
> misses when performing the lookup in the CodeGenerator cache. Yet, the code 
> is equivalent. 
> Below is (some portion of the) generated code for two runs of the query:
> run-1
> {noformat}
> import java.nio.ByteBuffer;
> import java.nio.ByteOrder;
> import scala.collection.Iterator;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> public SpecificColumnarIterator generate(Object[] references) {
> return new SpecificColumnarIterator();
> }
> class SpecificColumnarIterator extends 
> org.apache.spark.sql.execution.columnar.ColumnarIterator {
> private ByteOrder nativeOrder = null;
> private byte[][] buffers = null;
> private UnsafeRow unsafeRow = new UnsafeRow(7);
> private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
> private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
> private MutableUnsafeRow mutableRow = null;
> private int currentRow = 0;
> private int numRowsInBatch = 0;
> private scala.collection.Iterator input = null;
> private DataType[] columnTypes = null;
> private int[] columnIndexes = null;
> private