[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688225#comment-15688225 ]
Herman van Hovell commented on SPARK-18394: ------------------------------------------- Ok, that is fair. What strikes me as odd is that the column order that the columnar cache produces is different the two both plans. This is what causes the code generator to create two different 'programs' and what in the end causes the your caching problems . Could you re-run this without the in-memory cache, and see if you are still hitting this problem. I'll have a look on my end to see what is going on in the in-memory cache. > Executing the same query twice in a row results in CodeGenerator cache misses > ----------------------------------------------------------------------------- > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop > Reporter: Jonny Serencsa > Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); > accessor6 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); > return hasNext(); > } > public InternalRow next() { > currentRow += 1; > bufferHolder.reset(); > rowWriter.zeroOutNullBytes(); > accessor.extractTo(mutableRow, 0); > accessor1.extractTo(mutableRow, 1); > accessor2.extractTo(mutableRow, 2); > accessor3.extractTo(mutableRow, 3); > accessor4.extractTo(mutableRow, 4); > accessor5.extractTo(mutableRow, 5); > accessor6.extractTo(mutableRow, 6); > unsafeRow.setTotalSize(bufferHolder.totalSize()); > return unsafeRow; > } > } > {noformat} > run-2: > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); > accessor6 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); > return hasNext(); > } > public InternalRow next() { > currentRow += 1; > bufferHolder.reset(); > rowWriter.zeroOutNullBytes(); > accessor.extractTo(mutableRow, 0); > accessor1.extractTo(mutableRow, 1); > accessor2.extractTo(mutableRow, 2); > accessor3.extractTo(mutableRow, 3); > accessor4.extractTo(mutableRow, 4); > accessor5.extractTo(mutableRow, 5); > accessor6.extractTo(mutableRow, 6); > unsafeRow.setTotalSize(bufferHolder.totalSize()); > return unsafeRow; > } > } > {noformat} > Diff-ing the two files reveals that the "accessor*" variable definitions are > permuted. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org