[ https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331371#comment-15331371 ]
Pete Robbins commented on SPARK-15822: -------------------------------------- The generated code is: {code} Top Arrival Carrier Cancellations: Found 5 WholeStageCodegen subtrees. == Subtree 1 / 5 == *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L]) +- *Project [UniqueCarrier#8, Origin#16] +- *Filter (((((((isnotnull(Origin#16) && isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) && isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) && (CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD)) +- *Scan csv [UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), IsNotNull(CancellationCode), ..., ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private boolean agg_bufIsNull; /* 009 */ private long agg_bufValue; /* 010 */ private agg_VectorizedHashMap agg_vectorizedHashMap; /* 011 */ private java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> agg_vectorizedHashMapIter; /* 012 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 019 */ private scala.collection.Iterator scan_input; /* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows; /* 021 */ private UnsafeRow filter_result; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter; /* 024 */ private UnsafeRow project_result; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 027 */ private UnsafeRow agg_result2; /* 028 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 029 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 030 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner; /* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_numOutputRows; /* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_aggTime; /* 033 */ private UnsafeRow wholestagecodegen_result; /* 034 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder wholestagecodegen_holder; /* 035 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter wholestagecodegen_rowWriter; /* 036 */ /* 037 */ public GeneratedIterator(Object[] references) { /* 038 */ this.references = references; /* 039 */ } /* 040 */ /* 041 */ public void init(int index, scala.collection.Iterator inputs[]) { /* 042 */ partitionIndex = index; /* 043 */ agg_initAgg = false; /* 044 */ /* 045 */ agg_vectorizedHashMap = new agg_VectorizedHashMap(); /* 046 */ /* 047 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0]; /* 048 */ /* 049 */ this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 050 */ this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ scan_input = inputs[0]; /* 053 */ this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[4]; /* 054 */ filter_result = new UnsafeRow(5); /* 055 */ this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 128); /* 056 */ this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 5); /* 057 */ project_result = new UnsafeRow(2); /* 058 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 64); /* 059 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 2); /* 060 */ agg_result2 = new UnsafeRow(2); /* 061 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64); /* 062 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 2); /* 063 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner(); /* 064 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[7]; /* 065 */ this.wholestagecodegen_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[8]; /* 066 */ wholestagecodegen_result = new UnsafeRow(3); /* 067 */ this.wholestagecodegen_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result, 64); /* 068 */ this.wholestagecodegen_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder, 3); /* 069 */ } /* 070 */ /* 071 */ public class agg_VectorizedHashMap { /* 072 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; /* 073 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; /* 074 */ private int[] buckets; /* 075 */ private int capacity = 1 << 16; /* 076 */ private double loadFactor = 0.5; /* 077 */ private int numBuckets = (int) (capacity / loadFactor); /* 078 */ private int maxSteps = 2; /* 079 */ private int numRows = 0; /* 080 */ private org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType().add("Origin", org.apache.spark.sql.types.DataTypes.StringType) /* 081 */ .add("UniqueCarrier", org.apache.spark.sql.types.DataTypes.StringType) /* 082 */ .add("count", org.apache.spark.sql.types.DataTypes.LongType); /* 083 */ private org.apache.spark.sql.types.StructType aggregateBufferSchema = /* 084 */ new org.apache.spark.sql.types.StructType().add("count", org.apache.spark.sql.types.DataTypes.LongType); /* 085 */ /* 086 */ public agg_VectorizedHashMap() { /* 087 */ batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, /* 088 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); /* 089 */ // TODO: Possibly generate this projection in HashAggregate directly /* 090 */ aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( /* 091 */ aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); /* 092 */ for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { /* 093 */ aggregateBufferBatch.setColumn(i, batch.column(i+2)); /* 094 */ } /* 095 */ /* 096 */ buckets = new int[numBuckets]; /* 097 */ java.util.Arrays.fill(buckets, -1); /* 098 */ } /* 099 */ /* 100 */ public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(UTF8String agg_key, UTF8String agg_key1) { /* 101 */ long h = hash(agg_key, agg_key1); /* 102 */ int step = 0; /* 103 */ int idx = (int) h & (numBuckets - 1); /* 104 */ while (step < maxSteps) { /* 105 */ // Return bucket index if it's either an empty slot or already contains the key /* 106 */ if (buckets[idx] == -1) { /* 107 */ if (numRows < capacity) { /* 108 */ // Initialize aggregate keys /* 109 */ batch.column(0).putByteArray(numRows, agg_key.getBytes()); /* 110 */ batch.column(1).putByteArray(numRows, agg_key1.getBytes()); /* 111 */ /* 112 */ agg_bufIsNull = false; /* 113 */ agg_bufValue = 0L; /* 114 */ /* 115 */ // Initialize aggregate values /* 116 */ /* 117 */ if (!agg_bufIsNull) { /* 118 */ batch.column(2).putLong(numRows, agg_bufValue); /* 119 */ } else { /* 120 */ batch.column(2).putNull(numRows); /* 121 */ } /* 122 */ /* 123 */ buckets[idx] = numRows++; /* 124 */ batch.setNumRows(numRows); /* 125 */ aggregateBufferBatch.setNumRows(numRows); /* 126 */ return aggregateBufferBatch.getRow(buckets[idx]); /* 127 */ } else { /* 128 */ // No more space /* 129 */ return null; /* 130 */ } /* 131 */ } else if (equals(idx, agg_key, agg_key1)) { /* 132 */ return aggregateBufferBatch.getRow(buckets[idx]); /* 133 */ } /* 134 */ idx = (idx + 1) & (numBuckets - 1); /* 135 */ step++; /* 136 */ } /* 137 */ // Didn't find it /* 138 */ return null; /* 139 */ } /* 140 */ /* 141 */ private boolean equals(int idx, UTF8String agg_key, UTF8String agg_key1) { /* 142 */ return (batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) && (batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1)); /* 143 */ } /* 144 */ /* 145 */ private long hash(UTF8String agg_key, UTF8String agg_key1) { /* 146 */ long agg_hash = 0; /* 147 */ /* 148 */ int agg_result = 0; /* 149 */ for (int i = 0; i < agg_key.getBytes().length; i++) { /* 150 */ int agg_hash1 = agg_key.getBytes()[i]; /* 151 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2); /* 152 */ } /* 153 */ /* 154 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2); /* 155 */ /* 156 */ int agg_result1 = 0; /* 157 */ for (int i = 0; i < agg_key1.getBytes().length; i++) { /* 158 */ int agg_hash2 = agg_key1.getBytes()[i]; /* 159 */ agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 + (agg_result1 << 6) + (agg_result1 >>> 2); /* 160 */ } /* 161 */ /* 162 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash << 6) + (agg_hash >>> 2); /* 163 */ /* 164 */ return agg_hash; /* 165 */ } /* 166 */ /* 167 */ public java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> /* 168 */ rowIterator() { /* 169 */ return batch.rowIterator(); /* 170 */ } /* 171 */ /* 172 */ public void close() { /* 173 */ batch.close(); /* 174 */ } /* 175 */ /* 176 */ } /* 177 */ /* 178 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 179 */ agg_hashMap = agg_plan.createHashMap(); /* 180 */ /* 181 */ while (scan_input.hasNext()) { /* 182 */ InternalRow scan_row = (InternalRow) scan_input.next(); /* 183 */ scan_numOutputRows.add(1); /* 184 */ boolean scan_isNull8 = scan_row.isNullAt(3); /* 185 */ int scan_value8 = scan_isNull8 ? -1 : (scan_row.getInt(3)); /* 186 */ /* 187 */ if (!(!(scan_isNull8))) continue; /* 188 */ /* 189 */ boolean filter_value3 = false; /* 190 */ filter_value3 = scan_value8 == 0; /* 191 */ boolean filter_value2 = false; /* 192 */ filter_value2 = !(filter_value3); /* 193 */ if (!filter_value2) continue; /* 194 */ boolean scan_isNull9 = scan_row.isNullAt(4); /* 195 */ UTF8String scan_value9 = scan_isNull9 ? null : (scan_row.getUTF8String(4)); /* 196 */ /* 197 */ if (!(!(scan_isNull9))) continue; /* 198 */ /* 199 */ Object filter_obj = ((Expression) references[5]).eval(null); /* 200 */ UTF8String filter_value10 = (UTF8String) filter_obj; /* 201 */ boolean filter_value8 = false; /* 202 */ filter_value8 = scan_value9.equals(filter_value10); /* 203 */ if (!filter_value8) continue; /* 204 */ boolean scan_isNull7 = scan_row.isNullAt(2); /* 205 */ UTF8String scan_value7 = scan_isNull7 ? null : (scan_row.getUTF8String(2)); /* 206 */ /* 207 */ if (!(!(scan_isNull7))) continue; /* 208 */ /* 209 */ Object filter_obj1 = ((Expression) references[6]).eval(null); /* 210 */ UTF8String filter_value15 = (UTF8String) filter_obj1; /* 211 */ boolean filter_value13 = false; /* 212 */ filter_value13 = scan_value7.equals(filter_value15); /* 213 */ if (!filter_value13) continue; /* 214 */ /* 215 */ boolean scan_isNull6 = scan_row.isNullAt(1); /* 216 */ UTF8String scan_value6 = scan_isNull6 ? null : (scan_row.getUTF8String(1)); /* 217 */ /* 218 */ if (!(!(scan_isNull6))) continue; /* 219 */ /* 220 */ boolean scan_isNull5 = scan_row.isNullAt(0); /* 221 */ UTF8String scan_value5 = scan_isNull5 ? null : (scan_row.getUTF8String(0)); /* 222 */ /* 223 */ if (!(!(scan_isNull5))) continue; /* 224 */ /* 225 */ filter_numOutputRows.add(1); /* 226 */ /* 227 */ UnsafeRow agg_unsafeRowAggBuffer = null; /* 228 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null; /* 229 */ /* 230 */ if (true) { /* 231 */ if (!false && !false) { /* 232 */ agg_vectorizedAggBuffer = agg_vectorizedHashMap.findOrInsert( /* 233 */ scan_value6, scan_value5); /* 234 */ } /* 235 */ } /* 236 */ /* 237 */ if (agg_vectorizedAggBuffer == null) { /* 238 */ // generate grouping key /* 239 */ agg_holder.reset(); /* 240 */ /* 241 */ agg_rowWriter.write(0, scan_value6); /* 242 */ /* 243 */ agg_rowWriter.write(1, scan_value5); /* 244 */ agg_result2.setTotalSize(agg_holder.totalSize()); /* 245 */ int agg_value6 = 42; /* 246 */ /* 247 */ if (!false) { /* 248 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value6.getBaseObject(), scan_value6.getBaseOffset(), scan_value6.numBytes(), agg_value6); /* 249 */ } /* 250 */ /* 251 */ if (!false) { /* 252 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value5.getBaseObject(), scan_value5.getBaseOffset(), scan_value5.numBytes(), agg_value6); /* 253 */ } /* 254 */ if (true) { /* 255 */ // try to get the buffer from hash map /* 256 */ agg_unsafeRowAggBuffer = /* 257 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, agg_value6); /* 258 */ } /* 259 */ if (agg_unsafeRowAggBuffer == null) { /* 260 */ if (agg_sorter == null) { /* 261 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 262 */ } else { /* 263 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 264 */ } /* 265 */ /* 266 */ // the hash map had be spilled, it should have enough memory now, /* 267 */ // try to allocate buffer again. /* 268 */ agg_unsafeRowAggBuffer = /* 269 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, agg_value6); /* 270 */ if (agg_unsafeRowAggBuffer == null) { /* 271 */ // failed to allocate the first page /* 272 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 273 */ } /* 274 */ } /* 275 */ } /* 276 */ /* 277 */ if (agg_vectorizedAggBuffer != null) { /* 278 */ // update vectorized row /* 279 */ /* 280 */ // common sub-expressions /* 281 */ /* 282 */ // evaluate aggregate function /* 283 */ long agg_value10 = agg_vectorizedAggBuffer.getLong(0); /* 284 */ /* 285 */ long agg_value9 = -1L; /* 286 */ agg_value9 = agg_value10 + 1L; /* 287 */ // update vectorized row /* 288 */ agg_vectorizedAggBuffer.setLong(0, agg_value9); /* 289 */ /* 290 */ } else { /* 291 */ // update unsafe row /* 292 */ /* 293 */ // common sub-expressions /* 294 */ /* 295 */ // evaluate aggregate function /* 296 */ long agg_value13 = agg_unsafeRowAggBuffer.getLong(0); /* 297 */ /* 298 */ long agg_value12 = -1L; /* 299 */ agg_value12 = agg_value13 + 1L; /* 300 */ // update unsafe row buffer /* 301 */ agg_unsafeRowAggBuffer.setLong(0, agg_value12); /* 302 */ /* 303 */ } /* 304 */ if (shouldStop()) return; /* 305 */ } /* 306 */ /* 307 */ agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator(); /* 308 */ /* 309 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize); /* 310 */ } /* 311 */ /* 312 */ protected void processNext() throws java.io.IOException { /* 313 */ if (!agg_initAgg) { /* 314 */ agg_initAgg = true; /* 315 */ long wholestagecodegen_beforeAgg = System.nanoTime(); /* 316 */ agg_doAggregateWithKeys(); /* 317 */ wholestagecodegen_aggTime.add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000); /* 318 */ } /* 319 */ /* 320 */ // output the result /* 321 */ /* 322 */ while (agg_vectorizedHashMapIter.hasNext()) { /* 323 */ wholestagecodegen_numOutputRows.add(1); /* 324 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row wholestagecodegen_vectorizedHashMapRow = /* 325 */ (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) /* 326 */ agg_vectorizedHashMapIter.next(); /* 327 */ /* 328 */ wholestagecodegen_holder.reset(); /* 329 */ /* 330 */ wholestagecodegen_rowWriter.zeroOutNullBytes(); /* 331 */ /* 332 */ boolean wholestagecodegen_isNull = wholestagecodegen_vectorizedHashMapRow.isNullAt(0); /* 333 */ UTF8String wholestagecodegen_value = wholestagecodegen_isNull ? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0)); /* 334 */ if (wholestagecodegen_isNull) { /* 335 */ wholestagecodegen_rowWriter.setNullAt(0); /* 336 */ } else { /* 337 */ wholestagecodegen_rowWriter.write(0, wholestagecodegen_value); /* 338 */ } /* 339 */ /* 340 */ boolean wholestagecodegen_isNull1 = wholestagecodegen_vectorizedHashMapRow.isNullAt(1); /* 341 */ UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1 ? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1)); /* 342 */ if (wholestagecodegen_isNull1) { /* 343 */ wholestagecodegen_rowWriter.setNullAt(1); /* 344 */ } else { /* 345 */ wholestagecodegen_rowWriter.write(1, wholestagecodegen_value1); /* 346 */ } /* 347 */ /* 348 */ long wholestagecodegen_value2 = wholestagecodegen_vectorizedHashMapRow.getLong(2); /* 349 */ wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2); /* 350 */ wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize()); /* 351 */ /* 352 */ append(wholestagecodegen_result); /* 353 */ /* 354 */ if (shouldStop()) return; /* 355 */ } /* 356 */ /* 357 */ agg_vectorizedHashMap.close(); /* 358 */ /* 359 */ while (agg_mapIter.next()) { /* 360 */ wholestagecodegen_numOutputRows.add(1); /* 361 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 362 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 363 */ /* 364 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer); /* 365 */ /* 366 */ append(agg_resultRow); /* 367 */ /* 368 */ if (shouldStop()) return; /* 369 */ } /* 370 */ /* 371 */ agg_mapIter.close(); /* 372 */ if (agg_sorter == null) { /* 373 */ agg_hashMap.free(); /* 374 */ } /* 375 */ } /* 376 */ } == Subtree 2 / 5 == *Project [Origin#16, UniqueCarrier#8, round((cast((count#134L * 100) as double) / cast(total#97L as double)), 2) AS rank#173] +- *SortMergeJoin [Origin#16, UniqueCarrier#8], [Origin#155, UniqueCarrier#147], Inner :- *Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0 : +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)], output=[Origin#16,UniqueCarrier#8,count#134L]) : +- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200) : +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L]) : +- *Project [UniqueCarrier#8, Origin#16] : +- *Filter (((((((isnotnull(Origin#16) && isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) && isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) && (CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD)) : +- *Scan csv [UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), IsNotNull(CancellationCode), ..., ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string> +- *Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0 +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[count(1)], output=[Origin#155,UniqueCarrier#147,total#97L]) +- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200) +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L]) +- *Project [UniqueCarrier#147, Origin#155] +- *Filter (((isnotnull(UniqueCarrier#147) && isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD)) +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), EqualTo(Dest,ORD)], ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator smj_leftInput; /* 008 */ private scala.collection.Iterator smj_rightInput; /* 009 */ private InternalRow smj_leftRow; /* 010 */ private InternalRow smj_rightRow; /* 011 */ private UTF8String smj_value4; /* 012 */ private UTF8String smj_value5; /* 013 */ private java.util.ArrayList smj_matches; /* 014 */ private UTF8String smj_value6; /* 015 */ private UTF8String smj_value7; /* 016 */ private UTF8String smj_value8; /* 017 */ private boolean smj_isNull4; /* 018 */ private UTF8String smj_value9; /* 019 */ private boolean smj_isNull5; /* 020 */ private long smj_value10; /* 021 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows; /* 022 */ private UnsafeRow smj_result; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter; /* 025 */ private UnsafeRow project_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator inputs[]) { /* 034 */ partitionIndex = index; /* 035 */ smj_leftInput = inputs[0]; /* 036 */ smj_rightInput = inputs[1]; /* 037 */ /* 038 */ smj_rightRow = null; /* 039 */ /* 040 */ smj_matches = new java.util.ArrayList(); /* 041 */ /* 042 */ this.smj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 043 */ smj_result = new UnsafeRow(6); /* 044 */ this.smj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 128); /* 045 */ this.smj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 6); /* 046 */ project_result = new UnsafeRow(3); /* 047 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 64); /* 048 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 3); /* 049 */ } /* 050 */ /* 051 */ private boolean findNextInnerJoinRows( /* 052 */ scala.collection.Iterator leftIter, /* 053 */ scala.collection.Iterator rightIter) { /* 054 */ smj_leftRow = null; /* 055 */ int comp = 0; /* 056 */ while (smj_leftRow == null) { /* 057 */ if (!leftIter.hasNext()) return false; /* 058 */ smj_leftRow = (InternalRow) leftIter.next(); /* 059 */ /* 060 */ boolean smj_isNull = smj_leftRow.isNullAt(0); /* 061 */ UTF8String smj_value = smj_isNull ? null : (smj_leftRow.getUTF8String(0)); /* 062 */ /* 063 */ boolean smj_isNull1 = smj_leftRow.isNullAt(1); /* 064 */ UTF8String smj_value1 = smj_isNull1 ? null : (smj_leftRow.getUTF8String(1)); /* 065 */ if (smj_isNull || smj_isNull1) { /* 066 */ smj_leftRow = null; /* 067 */ continue; /* 068 */ } /* 069 */ if (!smj_matches.isEmpty()) { /* 070 */ comp = 0; /* 071 */ if (comp == 0) { /* 072 */ comp = smj_value.compare(smj_value6); /* 073 */ } /* 074 */ if (comp == 0) { /* 075 */ comp = smj_value1.compare(smj_value7); /* 076 */ } /* 077 */ /* 078 */ if (comp == 0) { /* 079 */ return true; /* 080 */ } /* 081 */ smj_matches.clear(); /* 082 */ } /* 083 */ /* 084 */ do { /* 085 */ if (smj_rightRow == null) { /* 086 */ if (!rightIter.hasNext()) { /* 087 */ smj_value6 = smj_value; /* 088 */ /* 089 */ smj_value7 = smj_value1; /* 090 */ /* 091 */ return !smj_matches.isEmpty(); /* 092 */ } /* 093 */ smj_rightRow = (InternalRow) rightIter.next(); /* 094 */ /* 095 */ boolean smj_isNull2 = smj_rightRow.isNullAt(0); /* 096 */ UTF8String smj_value2 = smj_isNull2 ? null : (smj_rightRow.getUTF8String(0)); /* 097 */ /* 098 */ boolean smj_isNull3 = smj_rightRow.isNullAt(1); /* 099 */ UTF8String smj_value3 = smj_isNull3 ? null : (smj_rightRow.getUTF8String(1)); /* 100 */ if (smj_isNull2 || smj_isNull3) { /* 101 */ smj_rightRow = null; /* 102 */ continue; /* 103 */ } /* 104 */ /* 105 */ smj_value4 = smj_value2; /* 106 */ /* 107 */ smj_value5 = smj_value3; /* 108 */ /* 109 */ } /* 110 */ /* 111 */ comp = 0; /* 112 */ if (comp == 0) { /* 113 */ comp = smj_value.compare(smj_value4); /* 114 */ } /* 115 */ if (comp == 0) { /* 116 */ comp = smj_value1.compare(smj_value5); /* 117 */ } /* 118 */ /* 119 */ if (comp > 0) { /* 120 */ smj_rightRow = null; /* 121 */ } else if (comp < 0) { /* 122 */ if (!smj_matches.isEmpty()) { /* 123 */ smj_value6 = smj_value; /* 124 */ /* 125 */ smj_value7 = smj_value1; /* 126 */ /* 127 */ return true; /* 128 */ } /* 129 */ smj_leftRow = null; /* 130 */ } else { /* 131 */ smj_matches.add(smj_rightRow.copy()); /* 132 */ smj_rightRow = null;; /* 133 */ } /* 134 */ } while (smj_leftRow != null); /* 135 */ } /* 136 */ return false; // unreachable /* 137 */ } /* 138 */ /* 139 */ protected void processNext() throws java.io.IOException { /* 140 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) { /* 141 */ int smj_size = smj_matches.size(); /* 142 */ smj_isNull4 = smj_leftRow.isNullAt(0); /* 143 */ smj_value8 = smj_isNull4 ? null : (smj_leftRow.getUTF8String(0)); /* 144 */ smj_isNull5 = smj_leftRow.isNullAt(1); /* 145 */ smj_value9 = smj_isNull5 ? null : (smj_leftRow.getUTF8String(1)); /* 146 */ smj_value10 = smj_leftRow.getLong(2); /* 147 */ for (int smj_i = 0; smj_i < smj_size; smj_i ++) { /* 148 */ InternalRow smj_rightRow1 = (InternalRow) smj_matches.get(smj_i); /* 149 */ /* 150 */ smj_numOutputRows.add(1); /* 151 */ /* 152 */ long smj_value13 = smj_rightRow1.getLong(2); /* 153 */ boolean project_isNull8 = false; /* 154 */ double project_value8 = -1.0; /* 155 */ if (!false) { /* 156 */ project_value8 = (double) smj_value13; /* 157 */ } /* 158 */ boolean project_isNull3 = false; /* 159 */ double project_value3 = -1.0; /* 160 */ if (project_value8 == 0) { /* 161 */ project_isNull3 = true; /* 162 */ } else { /* 163 */ long project_value5 = -1L; /* 164 */ project_value5 = smj_value10 * 100L; /* 165 */ boolean project_isNull4 = false; /* 166 */ double project_value4 = -1.0; /* 167 */ if (!false) { /* 168 */ project_value4 = (double) project_value5; /* 169 */ } /* 170 */ project_value3 = (double)(project_value4 / project_value8); /* 171 */ } /* 172 */ boolean project_isNull2 = project_isNull3; /* 173 */ double project_value2 = -1.0; /* 174 */ if (!project_isNull2) { /* 175 */ if (Double.isNaN(project_value3) || Double.isInfinite(project_value3)) { /* 176 */ project_value2 = project_value3; /* 177 */ } else { /* 178 */ project_value2 = java.math.BigDecimal.valueOf(project_value3). /* 179 */ setScale(2, java.math.BigDecimal.ROUND_HALF_UP).doubleValue(); /* 180 */ } /* 181 */ } /* 182 */ project_holder.reset(); /* 183 */ /* 184 */ project_rowWriter.zeroOutNullBytes(); /* 185 */ /* 186 */ if (smj_isNull4) { /* 187 */ project_rowWriter.setNullAt(0); /* 188 */ } else { /* 189 */ project_rowWriter.write(0, smj_value8); /* 190 */ } /* 191 */ /* 192 */ if (smj_isNull5) { /* 193 */ project_rowWriter.setNullAt(1); /* 194 */ } else { /* 195 */ project_rowWriter.write(1, smj_value9); /* 196 */ } /* 197 */ /* 198 */ if (project_isNull2) { /* 199 */ project_rowWriter.setNullAt(2); /* 200 */ } else { /* 201 */ project_rowWriter.write(2, project_value2); /* 202 */ } /* 203 */ project_result.setTotalSize(project_holder.totalSize()); /* 204 */ append(project_result.copy()); /* 205 */ /* 206 */ } /* 207 */ if (shouldStop()) return; /* 208 */ } /* 209 */ } /* 210 */ } == Subtree 3 / 5 == *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L]) +- *Project [UniqueCarrier#147, Origin#155] +- *Filter (((isnotnull(UniqueCarrier#147) && isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD)) +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), EqualTo(Dest,ORD)], ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private boolean agg_bufIsNull; /* 009 */ private long agg_bufValue; /* 010 */ private agg_VectorizedHashMap agg_vectorizedHashMap; /* 011 */ private java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> agg_vectorizedHashMapIter; /* 012 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 019 */ private scala.collection.Iterator scan_input; /* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows; /* 021 */ private UnsafeRow filter_result; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter; /* 024 */ private UnsafeRow project_result; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 027 */ private UnsafeRow agg_result2; /* 028 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 029 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 030 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner; /* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_numOutputRows; /* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_aggTime; /* 033 */ private UnsafeRow wholestagecodegen_result; /* 034 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder wholestagecodegen_holder; /* 035 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter wholestagecodegen_rowWriter; /* 036 */ /* 037 */ public GeneratedIterator(Object[] references) { /* 038 */ this.references = references; /* 039 */ } /* 040 */ /* 041 */ public void init(int index, scala.collection.Iterator inputs[]) { /* 042 */ partitionIndex = index; /* 043 */ agg_initAgg = false; /* 044 */ /* 045 */ agg_vectorizedHashMap = new agg_VectorizedHashMap(); /* 046 */ /* 047 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0]; /* 048 */ /* 049 */ this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 050 */ this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ scan_input = inputs[0]; /* 053 */ this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[4]; /* 054 */ filter_result = new UnsafeRow(3); /* 055 */ this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 96); /* 056 */ this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 3); /* 057 */ project_result = new UnsafeRow(2); /* 058 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 64); /* 059 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 2); /* 060 */ agg_result2 = new UnsafeRow(2); /* 061 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64); /* 062 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 2); /* 063 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner(); /* 064 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[6]; /* 065 */ this.wholestagecodegen_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[7]; /* 066 */ wholestagecodegen_result = new UnsafeRow(3); /* 067 */ this.wholestagecodegen_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result, 64); /* 068 */ this.wholestagecodegen_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder, 3); /* 069 */ } /* 070 */ /* 071 */ public class agg_VectorizedHashMap { /* 072 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; /* 073 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; /* 074 */ private int[] buckets; /* 075 */ private int capacity = 1 << 16; /* 076 */ private double loadFactor = 0.5; /* 077 */ private int numBuckets = (int) (capacity / loadFactor); /* 078 */ private int maxSteps = 2; /* 079 */ private int numRows = 0; /* 080 */ private org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType().add("Origin", org.apache.spark.sql.types.DataTypes.StringType) /* 081 */ .add("UniqueCarrier", org.apache.spark.sql.types.DataTypes.StringType) /* 082 */ .add("count", org.apache.spark.sql.types.DataTypes.LongType); /* 083 */ private org.apache.spark.sql.types.StructType aggregateBufferSchema = /* 084 */ new org.apache.spark.sql.types.StructType().add("count", org.apache.spark.sql.types.DataTypes.LongType); /* 085 */ /* 086 */ public agg_VectorizedHashMap() { /* 087 */ batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, /* 088 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); /* 089 */ // TODO: Possibly generate this projection in HashAggregate directly /* 090 */ aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( /* 091 */ aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); /* 092 */ for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { /* 093 */ aggregateBufferBatch.setColumn(i, batch.column(i+2)); /* 094 */ } /* 095 */ /* 096 */ buckets = new int[numBuckets]; /* 097 */ java.util.Arrays.fill(buckets, -1); /* 098 */ } /* 099 */ /* 100 */ public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(UTF8String agg_key, UTF8String agg_key1) { /* 101 */ long h = hash(agg_key, agg_key1); /* 102 */ int step = 0; /* 103 */ int idx = (int) h & (numBuckets - 1); /* 104 */ while (step < maxSteps) { /* 105 */ // Return bucket index if it's either an empty slot or already contains the key /* 106 */ if (buckets[idx] == -1) { /* 107 */ if (numRows < capacity) { /* 108 */ // Initialize aggregate keys /* 109 */ batch.column(0).putByteArray(numRows, agg_key.getBytes()); /* 110 */ batch.column(1).putByteArray(numRows, agg_key1.getBytes()); /* 111 */ /* 112 */ agg_bufIsNull = false; /* 113 */ agg_bufValue = 0L; /* 114 */ /* 115 */ // Initialize aggregate values /* 116 */ /* 117 */ if (!agg_bufIsNull) { /* 118 */ batch.column(2).putLong(numRows, agg_bufValue); /* 119 */ } else { /* 120 */ batch.column(2).putNull(numRows); /* 121 */ } /* 122 */ /* 123 */ buckets[idx] = numRows++; /* 124 */ batch.setNumRows(numRows); /* 125 */ aggregateBufferBatch.setNumRows(numRows); /* 126 */ return aggregateBufferBatch.getRow(buckets[idx]); /* 127 */ } else { /* 128 */ // No more space /* 129 */ return null; /* 130 */ } /* 131 */ } else if (equals(idx, agg_key, agg_key1)) { /* 132 */ return aggregateBufferBatch.getRow(buckets[idx]); /* 133 */ } /* 134 */ idx = (idx + 1) & (numBuckets - 1); /* 135 */ step++; /* 136 */ } /* 137 */ // Didn't find it /* 138 */ return null; /* 139 */ } /* 140 */ /* 141 */ private boolean equals(int idx, UTF8String agg_key, UTF8String agg_key1) { /* 142 */ return (batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) && (batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1)); /* 143 */ } /* 144 */ /* 145 */ private long hash(UTF8String agg_key, UTF8String agg_key1) { /* 146 */ long agg_hash = 0; /* 147 */ /* 148 */ int agg_result = 0; /* 149 */ for (int i = 0; i < agg_key.getBytes().length; i++) { /* 150 */ int agg_hash1 = agg_key.getBytes()[i]; /* 151 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2); /* 152 */ } /* 153 */ /* 154 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2); /* 155 */ /* 156 */ int agg_result1 = 0; /* 157 */ for (int i = 0; i < agg_key1.getBytes().length; i++) { /* 158 */ int agg_hash2 = agg_key1.getBytes()[i]; /* 159 */ agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 + (agg_result1 << 6) + (agg_result1 >>> 2); /* 160 */ } /* 161 */ /* 162 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash << 6) + (agg_hash >>> 2); /* 163 */ /* 164 */ return agg_hash; /* 165 */ } /* 166 */ /* 167 */ public java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> /* 168 */ rowIterator() { /* 169 */ return batch.rowIterator(); /* 170 */ } /* 171 */ /* 172 */ public void close() { /* 173 */ batch.close(); /* 174 */ } /* 175 */ /* 176 */ } /* 177 */ /* 178 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 179 */ agg_hashMap = agg_plan.createHashMap(); /* 180 */ /* 181 */ while (scan_input.hasNext()) { /* 182 */ InternalRow scan_row = (InternalRow) scan_input.next(); /* 183 */ scan_numOutputRows.add(1); /* 184 */ boolean scan_isNull5 = scan_row.isNullAt(2); /* 185 */ UTF8String scan_value5 = scan_isNull5 ? null : (scan_row.getUTF8String(2)); /* 186 */ /* 187 */ if (!(!(scan_isNull5))) continue; /* 188 */ /* 189 */ Object filter_obj = ((Expression) references[5]).eval(null); /* 190 */ UTF8String filter_value4 = (UTF8String) filter_obj; /* 191 */ boolean filter_value2 = false; /* 192 */ filter_value2 = scan_value5.equals(filter_value4); /* 193 */ if (!filter_value2) continue; /* 194 */ /* 195 */ boolean scan_isNull3 = scan_row.isNullAt(0); /* 196 */ UTF8String scan_value3 = scan_isNull3 ? null : (scan_row.getUTF8String(0)); /* 197 */ /* 198 */ if (!(!(scan_isNull3))) continue; /* 199 */ /* 200 */ boolean scan_isNull4 = scan_row.isNullAt(1); /* 201 */ UTF8String scan_value4 = scan_isNull4 ? null : (scan_row.getUTF8String(1)); /* 202 */ /* 203 */ if (!(!(scan_isNull4))) continue; /* 204 */ /* 205 */ filter_numOutputRows.add(1); /* 206 */ /* 207 */ UnsafeRow agg_unsafeRowAggBuffer = null; /* 208 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null; /* 209 */ /* 210 */ if (true) { /* 211 */ if (!false && !false) { /* 212 */ agg_vectorizedAggBuffer = agg_vectorizedHashMap.findOrInsert( /* 213 */ scan_value4, scan_value3); /* 214 */ } /* 215 */ } /* 216 */ /* 217 */ if (agg_vectorizedAggBuffer == null) { /* 218 */ // generate grouping key /* 219 */ agg_holder.reset(); /* 220 */ /* 221 */ agg_rowWriter.write(0, scan_value4); /* 222 */ /* 223 */ agg_rowWriter.write(1, scan_value3); /* 224 */ agg_result2.setTotalSize(agg_holder.totalSize()); /* 225 */ int agg_value6 = 42; /* 226 */ /* 227 */ if (!false) { /* 228 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value4.getBaseObject(), scan_value4.getBaseOffset(), scan_value4.numBytes(), agg_value6); /* 229 */ } /* 230 */ /* 231 */ if (!false) { /* 232 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value3.getBaseObject(), scan_value3.getBaseOffset(), scan_value3.numBytes(), agg_value6); /* 233 */ } /* 234 */ if (true) { /* 235 */ // try to get the buffer from hash map /* 236 */ agg_unsafeRowAggBuffer = /* 237 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, agg_value6); /* 238 */ } /* 239 */ if (agg_unsafeRowAggBuffer == null) { /* 240 */ if (agg_sorter == null) { /* 241 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 242 */ } else { /* 243 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 244 */ } /* 245 */ /* 246 */ // the hash map had be spilled, it should have enough memory now, /* 247 */ // try to allocate buffer again. /* 248 */ agg_unsafeRowAggBuffer = /* 249 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, agg_value6); /* 250 */ if (agg_unsafeRowAggBuffer == null) { /* 251 */ // failed to allocate the first page /* 252 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 253 */ } /* 254 */ } /* 255 */ } /* 256 */ /* 257 */ if (agg_vectorizedAggBuffer != null) { /* 258 */ // update vectorized row /* 259 */ /* 260 */ // common sub-expressions /* 261 */ /* 262 */ // evaluate aggregate function /* 263 */ long agg_value10 = agg_vectorizedAggBuffer.getLong(0); /* 264 */ /* 265 */ long agg_value9 = -1L; /* 266 */ agg_value9 = agg_value10 + 1L; /* 267 */ // update vectorized row /* 268 */ agg_vectorizedAggBuffer.setLong(0, agg_value9); /* 269 */ /* 270 */ } else { /* 271 */ // update unsafe row /* 272 */ /* 273 */ // common sub-expressions /* 274 */ /* 275 */ // evaluate aggregate function /* 276 */ long agg_value13 = agg_unsafeRowAggBuffer.getLong(0); /* 277 */ /* 278 */ long agg_value12 = -1L; /* 279 */ agg_value12 = agg_value13 + 1L; /* 280 */ // update unsafe row buffer /* 281 */ agg_unsafeRowAggBuffer.setLong(0, agg_value12); /* 282 */ /* 283 */ } /* 284 */ if (shouldStop()) return; /* 285 */ } /* 286 */ /* 287 */ agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator(); /* 288 */ /* 289 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize); /* 290 */ } /* 291 */ /* 292 */ protected void processNext() throws java.io.IOException { /* 293 */ if (!agg_initAgg) { /* 294 */ agg_initAgg = true; /* 295 */ long wholestagecodegen_beforeAgg = System.nanoTime(); /* 296 */ agg_doAggregateWithKeys(); /* 297 */ wholestagecodegen_aggTime.add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000); /* 298 */ } /* 299 */ /* 300 */ // output the result /* 301 */ /* 302 */ while (agg_vectorizedHashMapIter.hasNext()) { /* 303 */ wholestagecodegen_numOutputRows.add(1); /* 304 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row wholestagecodegen_vectorizedHashMapRow = /* 305 */ (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row) /* 306 */ agg_vectorizedHashMapIter.next(); /* 307 */ /* 308 */ wholestagecodegen_holder.reset(); /* 309 */ /* 310 */ wholestagecodegen_rowWriter.zeroOutNullBytes(); /* 311 */ /* 312 */ boolean wholestagecodegen_isNull = wholestagecodegen_vectorizedHashMapRow.isNullAt(0); /* 313 */ UTF8String wholestagecodegen_value = wholestagecodegen_isNull ? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0)); /* 314 */ if (wholestagecodegen_isNull) { /* 315 */ wholestagecodegen_rowWriter.setNullAt(0); /* 316 */ } else { /* 317 */ wholestagecodegen_rowWriter.write(0, wholestagecodegen_value); /* 318 */ } /* 319 */ /* 320 */ boolean wholestagecodegen_isNull1 = wholestagecodegen_vectorizedHashMapRow.isNullAt(1); /* 321 */ UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1 ? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1)); /* 322 */ if (wholestagecodegen_isNull1) { /* 323 */ wholestagecodegen_rowWriter.setNullAt(1); /* 324 */ } else { /* 325 */ wholestagecodegen_rowWriter.write(1, wholestagecodegen_value1); /* 326 */ } /* 327 */ /* 328 */ long wholestagecodegen_value2 = wholestagecodegen_vectorizedHashMapRow.getLong(2); /* 329 */ wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2); /* 330 */ wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize()); /* 331 */ /* 332 */ append(wholestagecodegen_result); /* 333 */ /* 334 */ if (shouldStop()) return; /* 335 */ } /* 336 */ /* 337 */ agg_vectorizedHashMap.close(); /* 338 */ /* 339 */ while (agg_mapIter.next()) { /* 340 */ wholestagecodegen_numOutputRows.add(1); /* 341 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 342 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 343 */ /* 344 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer); /* 345 */ /* 346 */ append(agg_resultRow); /* 347 */ /* 348 */ if (shouldStop()) return; /* 349 */ } /* 350 */ /* 351 */ agg_mapIter.close(); /* 352 */ if (agg_sorter == null) { /* 353 */ agg_hashMap.free(); /* 354 */ } /* 355 */ } /* 356 */ } == Subtree 4 / 5 == *Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0 +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[count(1)], output=[Origin#155,UniqueCarrier#147,total#97L]) +- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200) +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L]) +- *Project [UniqueCarrier#147, Origin#155] +- *Filter (((isnotnull(UniqueCarrier#147) && isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD)) +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), EqualTo(Dest,ORD)], ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean sort_needToSort; /* 008 */ private org.apache.spark.sql.execution.SortExec sort_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter; /* 010 */ private org.apache.spark.executor.TaskMetrics sort_metrics; /* 011 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; /* 012 */ private boolean agg_initAgg; /* 013 */ private boolean agg_bufIsNull; /* 014 */ private long agg_bufValue; /* 015 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 016 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 017 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 018 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 019 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 021 */ private scala.collection.Iterator inputadapter_input; /* 022 */ private UnsafeRow agg_result; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 025 */ private UnsafeRow agg_result1; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1; /* 028 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_numOutputRows; /* 029 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_aggTime; /* 030 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory; /* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize; /* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime; /* 033 */ /* 034 */ public GeneratedIterator(Object[] references) { /* 035 */ this.references = references; /* 036 */ } /* 037 */ /* 038 */ public void init(int index, scala.collection.Iterator inputs[]) { /* 039 */ partitionIndex = index; /* 040 */ sort_needToSort = true; /* 041 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0]; /* 042 */ sort_sorter = sort_plan.createSorter(); /* 043 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); /* 044 */ /* 045 */ agg_initAgg = false; /* 046 */ /* 047 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1]; /* 048 */ /* 049 */ this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 050 */ this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 051 */ inputadapter_input = inputs[0]; /* 052 */ agg_result = new UnsafeRow(2); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 2); /* 055 */ agg_result1 = new UnsafeRow(3); /* 056 */ this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64); /* 057 */ this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 3); /* 058 */ this.sort_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[4]; /* 059 */ this.sort_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[5]; /* 060 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[6]; /* 061 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[7]; /* 062 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[8]; /* 063 */ } /* 064 */ /* 065 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 066 */ agg_hashMap = agg_plan.createHashMap(); /* 067 */ /* 068 */ while (inputadapter_input.hasNext()) { /* 069 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 070 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 071 */ UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0)); /* 072 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1); /* 073 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1)); /* 074 */ long inputadapter_value2 = inputadapter_row.getLong(2); /* 075 */ /* 076 */ UnsafeRow agg_unsafeRowAggBuffer = null; /* 077 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null; /* 078 */ /* 079 */ if (agg_vectorizedAggBuffer == null) { /* 080 */ // generate grouping key /* 081 */ agg_holder.reset(); /* 082 */ /* 083 */ agg_rowWriter.zeroOutNullBytes(); /* 084 */ /* 085 */ if (inputadapter_isNull) { /* 086 */ agg_rowWriter.setNullAt(0); /* 087 */ } else { /* 088 */ agg_rowWriter.write(0, inputadapter_value); /* 089 */ } /* 090 */ /* 091 */ if (inputadapter_isNull1) { /* 092 */ agg_rowWriter.setNullAt(1); /* 093 */ } else { /* 094 */ agg_rowWriter.write(1, inputadapter_value1); /* 095 */ } /* 096 */ agg_result.setTotalSize(agg_holder.totalSize()); /* 097 */ int agg_value6 = 42; /* 098 */ /* 099 */ if (!inputadapter_isNull) { /* 100 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(), inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6); /* 101 */ } /* 102 */ /* 103 */ if (!inputadapter_isNull1) { /* 104 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(), inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), agg_value6); /* 105 */ } /* 106 */ if (true) { /* 107 */ // try to get the buffer from hash map /* 108 */ agg_unsafeRowAggBuffer = /* 109 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); /* 110 */ } /* 111 */ if (agg_unsafeRowAggBuffer == null) { /* 112 */ if (agg_sorter == null) { /* 113 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 114 */ } else { /* 115 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 116 */ } /* 117 */ /* 118 */ // the hash map had be spilled, it should have enough memory now, /* 119 */ // try to allocate buffer again. /* 120 */ agg_unsafeRowAggBuffer = /* 121 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); /* 122 */ if (agg_unsafeRowAggBuffer == null) { /* 123 */ // failed to allocate the first page /* 124 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 125 */ } /* 126 */ } /* 127 */ } /* 128 */ /* 129 */ if (agg_vectorizedAggBuffer != null) { /* 130 */ // update vectorized row /* 131 */ /* 132 */ } else { /* 133 */ // update unsafe row /* 134 */ /* 135 */ // common sub-expressions /* 136 */ /* 137 */ // evaluate aggregate function /* 138 */ long agg_value10 = agg_unsafeRowAggBuffer.getLong(0); /* 139 */ /* 140 */ long agg_value9 = -1L; /* 141 */ agg_value9 = agg_value10 + inputadapter_value2; /* 142 */ // update unsafe row buffer /* 143 */ agg_unsafeRowAggBuffer.setLong(0, agg_value9); /* 144 */ /* 145 */ } /* 146 */ if (shouldStop()) return; /* 147 */ } /* 148 */ /* 149 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize); /* 150 */ } /* 151 */ /* 152 */ private void sort_addToSorter() throws java.io.IOException { /* 153 */ if (!agg_initAgg) { /* 154 */ agg_initAgg = true; /* 155 */ long sort_beforeAgg = System.nanoTime(); /* 156 */ agg_doAggregateWithKeys(); /* 157 */ sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 1000000); /* 158 */ } /* 159 */ /* 160 */ // output the result /* 161 */ /* 162 */ while (agg_mapIter.next()) { /* 163 */ sort_numOutputRows.add(1); /* 164 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 165 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 166 */ /* 167 */ boolean agg_isNull11 = agg_aggKey.isNullAt(0); /* 168 */ UTF8String agg_value12 = agg_isNull11 ? null : (agg_aggKey.getUTF8String(0)); /* 169 */ boolean agg_isNull12 = agg_aggKey.isNullAt(1); /* 170 */ UTF8String agg_value13 = agg_isNull12 ? null : (agg_aggKey.getUTF8String(1)); /* 171 */ long agg_value14 = agg_aggBuffer.getLong(0); /* 172 */ /* 173 */ agg_holder1.reset(); /* 174 */ /* 175 */ agg_rowWriter1.zeroOutNullBytes(); /* 176 */ /* 177 */ if (agg_isNull11) { /* 178 */ agg_rowWriter1.setNullAt(0); /* 179 */ } else { /* 180 */ agg_rowWriter1.write(0, agg_value12); /* 181 */ } /* 182 */ /* 183 */ if (agg_isNull12) { /* 184 */ agg_rowWriter1.setNullAt(1); /* 185 */ } else { /* 186 */ agg_rowWriter1.write(1, agg_value13); /* 187 */ } /* 188 */ /* 189 */ agg_rowWriter1.write(2, agg_value14); /* 190 */ agg_result1.setTotalSize(agg_holder1.totalSize()); /* 191 */ sort_sorter.insertRow((UnsafeRow)agg_result1); /* 192 */ /* 193 */ if (shouldStop()) return; /* 194 */ } /* 195 */ /* 196 */ agg_mapIter.close(); /* 197 */ if (agg_sorter == null) { /* 198 */ agg_hashMap.free(); /* 199 */ } /* 200 */ /* 201 */ } /* 202 */ /* 203 */ protected void processNext() throws java.io.IOException { /* 204 */ if (sort_needToSort) { /* 205 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); /* 206 */ sort_addToSorter(); /* 207 */ sort_sortedIter = sort_sorter.sort(); /* 208 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000); /* 209 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage()); /* 210 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore); /* 211 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); /* 212 */ sort_needToSort = false; /* 213 */ } /* 214 */ /* 215 */ while (sort_sortedIter.hasNext()) { /* 216 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); /* 217 */ /* 218 */ append(sort_outputRow); /* 219 */ /* 220 */ if (shouldStop()) return; /* 221 */ } /* 222 */ } /* 223 */ } == Subtree 5 / 5 == *Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0 +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)], output=[Origin#16,UniqueCarrier#8,count#134L]) +- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200) +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L]) +- *Project [UniqueCarrier#8, Origin#16] +- *Filter (((((((isnotnull(Origin#16) && isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) && isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) && (CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD)) +- *Scan csv [UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: [IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), IsNotNull(CancellationCode), ..., ReadSchema: struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean sort_needToSort; /* 008 */ private org.apache.spark.sql.execution.SortExec sort_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter; /* 010 */ private org.apache.spark.executor.TaskMetrics sort_metrics; /* 011 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; /* 012 */ private boolean agg_initAgg; /* 013 */ private boolean agg_bufIsNull; /* 014 */ private long agg_bufValue; /* 015 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 016 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 017 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 018 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 019 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 021 */ private scala.collection.Iterator inputadapter_input; /* 022 */ private UnsafeRow agg_result; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 025 */ private UnsafeRow agg_result1; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1; /* 028 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_numOutputRows; /* 029 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_aggTime; /* 030 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory; /* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize; /* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime; /* 033 */ /* 034 */ public GeneratedIterator(Object[] references) { /* 035 */ this.references = references; /* 036 */ } /* 037 */ /* 038 */ public void init(int index, scala.collection.Iterator inputs[]) { /* 039 */ partitionIndex = index; /* 040 */ sort_needToSort = true; /* 041 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0]; /* 042 */ sort_sorter = sort_plan.createSorter(); /* 043 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); /* 044 */ /* 045 */ agg_initAgg = false; /* 046 */ /* 047 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1]; /* 048 */ /* 049 */ this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 050 */ this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 051 */ inputadapter_input = inputs[0]; /* 052 */ agg_result = new UnsafeRow(2); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 2); /* 055 */ agg_result1 = new UnsafeRow(3); /* 056 */ this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64); /* 057 */ this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 3); /* 058 */ this.sort_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[4]; /* 059 */ this.sort_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[5]; /* 060 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[6]; /* 061 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[7]; /* 062 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[8]; /* 063 */ } /* 064 */ /* 065 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 066 */ agg_hashMap = agg_plan.createHashMap(); /* 067 */ /* 068 */ while (inputadapter_input.hasNext()) { /* 069 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 070 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 071 */ UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0)); /* 072 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1); /* 073 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1)); /* 074 */ long inputadapter_value2 = inputadapter_row.getLong(2); /* 075 */ /* 076 */ UnsafeRow agg_unsafeRowAggBuffer = null; /* 077 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null; /* 078 */ /* 079 */ if (agg_vectorizedAggBuffer == null) { /* 080 */ // generate grouping key /* 081 */ agg_holder.reset(); /* 082 */ /* 083 */ agg_rowWriter.zeroOutNullBytes(); /* 084 */ /* 085 */ if (inputadapter_isNull) { /* 086 */ agg_rowWriter.setNullAt(0); /* 087 */ } else { /* 088 */ agg_rowWriter.write(0, inputadapter_value); /* 089 */ } /* 090 */ /* 091 */ if (inputadapter_isNull1) { /* 092 */ agg_rowWriter.setNullAt(1); /* 093 */ } else { /* 094 */ agg_rowWriter.write(1, inputadapter_value1); /* 095 */ } /* 096 */ agg_result.setTotalSize(agg_holder.totalSize()); /* 097 */ int agg_value6 = 42; /* 098 */ /* 099 */ if (!inputadapter_isNull) { /* 100 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(), inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6); /* 101 */ } /* 102 */ /* 103 */ if (!inputadapter_isNull1) { /* 104 */ agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(), inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), agg_value6); /* 105 */ } /* 106 */ if (true) { /* 107 */ // try to get the buffer from hash map /* 108 */ agg_unsafeRowAggBuffer = /* 109 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); /* 110 */ } /* 111 */ if (agg_unsafeRowAggBuffer == null) { /* 112 */ if (agg_sorter == null) { /* 113 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 114 */ } else { /* 115 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 116 */ } /* 117 */ /* 118 */ // the hash map had be spilled, it should have enough memory now, /* 119 */ // try to allocate buffer again. /* 120 */ agg_unsafeRowAggBuffer = /* 121 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); /* 122 */ if (agg_unsafeRowAggBuffer == null) { /* 123 */ // failed to allocate the first page /* 124 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 125 */ } /* 126 */ } /* 127 */ } /* 128 */ /* 129 */ if (agg_vectorizedAggBuffer != null) { /* 130 */ // update vectorized row /* 131 */ /* 132 */ } else { /* 133 */ // update unsafe row /* 134 */ /* 135 */ // common sub-expressions /* 136 */ /* 137 */ // evaluate aggregate function /* 138 */ long agg_value10 = agg_unsafeRowAggBuffer.getLong(0); /* 139 */ /* 140 */ long agg_value9 = -1L; /* 141 */ agg_value9 = agg_value10 + inputadapter_value2; /* 142 */ // update unsafe row buffer /* 143 */ agg_unsafeRowAggBuffer.setLong(0, agg_value9); /* 144 */ /* 145 */ } /* 146 */ if (shouldStop()) return; /* 147 */ } /* 148 */ /* 149 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize); /* 150 */ } /* 151 */ /* 152 */ private void sort_addToSorter() throws java.io.IOException { /* 153 */ if (!agg_initAgg) { /* 154 */ agg_initAgg = true; /* 155 */ long sort_beforeAgg = System.nanoTime(); /* 156 */ agg_doAggregateWithKeys(); /* 157 */ sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 1000000); /* 158 */ } /* 159 */ /* 160 */ // output the result /* 161 */ /* 162 */ while (agg_mapIter.next()) { /* 163 */ sort_numOutputRows.add(1); /* 164 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 165 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 166 */ /* 167 */ boolean agg_isNull11 = agg_aggKey.isNullAt(0); /* 168 */ UTF8String agg_value12 = agg_isNull11 ? null : (agg_aggKey.getUTF8String(0)); /* 169 */ boolean agg_isNull12 = agg_aggKey.isNullAt(1); /* 170 */ UTF8String agg_value13 = agg_isNull12 ? null : (agg_aggKey.getUTF8String(1)); /* 171 */ long agg_value14 = agg_aggBuffer.getLong(0); /* 172 */ /* 173 */ agg_holder1.reset(); /* 174 */ /* 175 */ agg_rowWriter1.zeroOutNullBytes(); /* 176 */ /* 177 */ if (agg_isNull11) { /* 178 */ agg_rowWriter1.setNullAt(0); /* 179 */ } else { /* 180 */ agg_rowWriter1.write(0, agg_value12); /* 181 */ } /* 182 */ /* 183 */ if (agg_isNull12) { /* 184 */ agg_rowWriter1.setNullAt(1); /* 185 */ } else { /* 186 */ agg_rowWriter1.write(1, agg_value13); /* 187 */ } /* 188 */ /* 189 */ agg_rowWriter1.write(2, agg_value14); /* 190 */ agg_result1.setTotalSize(agg_holder1.totalSize()); /* 191 */ sort_sorter.insertRow((UnsafeRow)agg_result1); /* 192 */ /* 193 */ if (shouldStop()) return; /* 194 */ } /* 195 */ /* 196 */ agg_mapIter.close(); /* 197 */ if (agg_sorter == null) { /* 198 */ agg_hashMap.free(); /* 199 */ } /* 200 */ /* 201 */ } /* 202 */ /* 203 */ protected void processNext() throws java.io.IOException { /* 204 */ if (sort_needToSort) { /* 205 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); /* 206 */ sort_addToSorter(); /* 207 */ sort_sortedIter = sort_sorter.sort(); /* 208 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000); /* 209 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage()); /* 210 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore); /* 211 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); /* 212 */ sort_needToSort = false; /* 213 */ } /* 214 */ /* 215 */ while (sort_sortedIter.hasNext()) { /* 216 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); /* 217 */ /* 218 */ append(sort_outputRow); /* 219 */ /* 220 */ if (shouldStop()) return; /* 221 */ } /* 222 */ } /* 223 */ } {code} > segmentation violation in o.a.s.unsafe.types.UTF8String > -------------------------------------------------------- > > Key: SPARK-15822 > URL: https://issues.apache.org/jira/browse/SPARK-15822 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.0 > Environment: linux amd64 > openjdk version "1.8.0_91" > OpenJDK Runtime Environment (build 1.8.0_91-b14) > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode) > Reporter: Pete Robbins > Assignee: Herman van Hovell > Priority: Blocker > > Executors fail with segmentation violation while running application with > spark.memory.offHeap.enabled true > spark.memory.offHeap.size 512m > Also now reproduced with > spark.memory.offHeap.enabled false > {noformat} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400 > # > # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14) > # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 > compressed oops) > # Problematic frame: > # J 4816 C2 > org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I > (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d] > {noformat} > We initially saw this on IBM java on PowerPC box but is recreatable on linux > with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the > same code point: > {noformat} > 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48) > java.lang.NullPointerException > at > org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831) > at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30) > at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664) > at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) > at > org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365) > at > org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.lang.Thread.run(Thread.java:785) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org