[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223835#comment-16223835 ] Wenchen Fan commented on SPARK-18394: - resolved by https://github.com/apache/spark/pull/18959 > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Assignee: Takeshi Yamamuro > Fix For: 2.3.0 > > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128243#comment-16128243 ] Takeshi Yamamuro commented on SPARK-18394: -- Any update? I checked and I found the master still has this issue; I just run the query above and dump output names in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L102. {code} 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9015,L_RETURNFLAG#9019,l_shipdate#9021,L_TAX#9018,L_DISCOUNT#9017,L_LINESTATUS#9020,L_EXTENDEDPRICE#9016 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_RETURNFLAG#9142,L_DISCOUNT#9140,L_EXTENDEDPRICE#9139,L_QUANTITY#9138,L_LINESTATUS#9143,l_shipdate#9144,L_TAX#9141 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9305,L_TAX#9308,l_shipdate#9311,L_DISCOUNT#9307,L_RETURNFLAG#9309,L_LINESTATUS#9310,L_EXTENDEDPRICE#9306 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_EXTENDEDPRICE#9451,L_QUANTITY#9450,L_RETURNFLAG#9454,L_TAX#9453,L_DISCOUNT#9452,l_shipdate#9456,L_LINESTATUS#9455 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_LINESTATUS#9600,l_shipdate#9601,L_DISCOUNT#9597,L_TAX#9598,L_EXTENDEDPRICE#9596,L_RETURNFLAG#9599,L_QUANTITY#9595 17/08/16 02:13:13 WARN BaseSessionStateBuilder$$anon$3: L_QUANTITY#9740,L_TAX#9743,l_shipdate#9746,L_DISCOUNT#9742,L_EXTENDEDPRICE#9741,L_LINESTATUS#9745,L_RETURNFLAG#9744 ... {code} The attribute order is different, and then Spark generates different code in `GenerateColumnAccessor`. Also, I quickly checked `AttributeSet.toSeq` output attributes with a different order; {code} scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098)) scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107)) scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(838)) scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil) scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(389)) scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(89329)) scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil) scala> (attrSetA ++ attrSetB).toSeq res1: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = WrappedArray(c3#838, c4#389, c2#107, c5#89329, c1#1098) scala> val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(392)) scala> val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(92)) scala> val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(87)) scala> val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil) scala> val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(9023920)) scala> val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(522)) scala> val attrSetB = AttributeSet(attr4 :: attr5 :: Nil) scala> (attrSetA ++ attrSetB).toSeq res2: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = WrappedArray(c3#87, c1#392, c5#522, c2#92, c4#9023920) {code} As suggested, to fix this, `Attribute.toSeq` need to output attributes with a consistent order like; https://github.com/apache/spark/compare/master...maropu:SPARK-18394 > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688345#comment-15688345 ] Herman van Hovell commented on SPARK-18394: --- Great! Ping me if you need any assistance. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688333#comment-15688333 ] Jonny Serencsa commented on SPARK-18394: Yes I would. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688330#comment-15688330 ] Herman van Hovell commented on SPARK-18394: --- Nice, this is a good find. I think we either need to make AttributeSet iteration deterministic, or make a change to the following line in the SparkPlanner: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L96 Would you be interested in working on this? > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688244#comment-15688244 ] Jonny Serencsa commented on SPARK-18394: Already did that. The problem happens with both HiveScanExec and InMemoryScanExec. The indeterminate ordering is an artifact of the hash code for AttributeEquals involving a hash code of the exprId. Thus, when you iterate through an AttributeSet, the order or the attributes is not consistent. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688225#comment-15688225 ] Herman van Hovell commented on SPARK-18394: --- Ok, that is fair. What strikes me as odd is that the column order that the columnar cache produces is different the two both plans. This is what causes the code generator to create two different 'programs' and what in the end causes the your caching problems . Could you re-run this without the in-memory cache, and see if you are still hitting this problem. I'll have a look on my end to see what is going on in the in-memory cache. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688135#comment-15688135 ] Jonny Serencsa commented on SPARK-18394: This problem was discovered during high concurrency experiments where I was running the aforementioned query thousands of times repeatedly via many concurrent clients. Eventually, after 5K-10K executions, the JVM level CodeGenCache was having to be purged resulting in 10 second long pauses. My expectation was that since the exact same query is being executed, Spark would not have to re-generate the byte code (because of it's own CodeGenCache). After removing the WHERE clauses from the query, this was in fact the case and the JVM level cache purging disappeared. I made this a Major issue because it didn't seem like Spark's CodeGenCache was working as expected. Perhaps I am mistaken. My above repro of the issue required me to set breakpoints through the debugger. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] =
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688059#comment-15688059 ] Herman van Hovell commented on SPARK-18394: --- I am not able to reproduce this. Could you also explain to me why this is a major issue? I have used the following script: {noformat} sc.setLogLevel("INFO") spark.sql("create database if not exists tpc") spark.sql("drop table if exists tpc.lineitem") spark.sql(""" create table tpc.lineitem ( L_ORDERKEY bigint, L_PARTKEY bigint, L_SUPPKEY bigint, L_LINENUMBER bigint, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_RETURNFLAG string, L_LINESTATUS string, L_SHIPDATE string, L_COMMITDATE string, L_RECEIPTDATE string, L_SHIPINSTRUCT string, L_SHIPMODE string, L_COMMENT string ) using parquet """) spark.sql(s""" insert into tpc.lineitem select id as L_ORDERKEY, id % 10 as L_PARTKEY, id % 50 as L_SUPPKEY, id as L_LINENUMBER, rand(3) * 10 as L_QUANTITY, rand(5) * 50 as L_EXTENDEDPRICE, rand(7) * 20 as L_DISCOUNT, 0.18d as L_TAX, case when rand(11) < 0.7d then 'Y' else 'N' end as L_RETURNFLAG, case when rand(13) < 0.4d then 'A' when rand(17) < 0.2d then 'B' else 'C' end as L_LINESTATUS, date_format(date_add(date '1998-08-05', id % 365), '-MM-dd') as L_SHIPDATE, date_format(date_add(date '1998-08-01', id % 365), '-MM-dd') as L_COMMITDATE, date_format(date_add(date '1998-08-03', id % 365), '-MM-dd') as L_RECEIPTDATE, 'DUMMY' as L_SHIPINSTRUCT, case when rand(19) < 0.7d then 'AIR' else 'LAND' end as L_SHIPMODE, 'DUMMY' as L_COMMENT from range(100) """) val df = spark.sql(""" select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from tpc.lineitem where l_shipdate <= date_sub('1998-12-01', '90') group by l_returnflag, l_linestatus """) df.show() df.show() {noformat} > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private