[ 
https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331371#comment-15331371
 ] 

Pete Robbins commented on SPARK-15822:
--------------------------------------

The generated code is:

{code}
Top Arrival Carrier Cancellations:
Found 5 WholeStageCodegen subtrees.
== Subtree 1 / 5 ==
*HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[partial_count(1)], 
output=[Origin#16,UniqueCarrier#8,count#296L])
+- *Project [UniqueCarrier#8, Origin#16]
   +- *Filter (((((((isnotnull(Origin#16) && isnotnull(UniqueCarrier#8)) && 
isnotnull(Cancelled#21)) && isnotnull(CancellationCode#22)) && NOT 
(Cancelled#21 = 0)) && (CancellationCode#22 = A)) && isnotnull(Dest#17)) && 
(Dest#17 = ORD))
      +- *Scan csv 
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: 
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), 
IsNotNull(CancellationCode), ..., ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private boolean agg_bufIsNull;
/* 009 */   private long agg_bufValue;
/* 010 */   private agg_VectorizedHashMap agg_vectorizedHashMap;
/* 011 */   private 
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> 
agg_vectorizedHashMapIter;
/* 012 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 013 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 015 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
/* 019 */   private scala.collection.Iterator scan_input;
/* 020 */   private org.apache.spark.sql.execution.metric.SQLMetric 
filter_numOutputRows;
/* 021 */   private UnsafeRow filter_result;
/* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
filter_rowWriter;
/* 024 */   private UnsafeRow project_result;
/* 025 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 026 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
project_rowWriter;
/* 027 */   private UnsafeRow agg_result2;
/* 028 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 029 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 030 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner 
agg_unsafeRowJoiner;
/* 031 */   private org.apache.spark.sql.execution.metric.SQLMetric 
wholestagecodegen_numOutputRows;
/* 032 */   private org.apache.spark.sql.execution.metric.SQLMetric 
wholestagecodegen_aggTime;
/* 033 */   private UnsafeRow wholestagecodegen_result;
/* 034 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
wholestagecodegen_holder;
/* 035 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
wholestagecodegen_rowWriter;
/* 036 */
/* 037 */   public GeneratedIterator(Object[] references) {
/* 038 */     this.references = references;
/* 039 */   }
/* 040 */
/* 041 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 042 */     partitionIndex = index;
/* 043 */     agg_initAgg = false;
/* 044 */
/* 045 */     agg_vectorizedHashMap = new agg_VectorizedHashMap();
/* 046 */
/* 047 */     this.agg_plan = 
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 048 */
/* 049 */     this.agg_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */     this.agg_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     scan_input = inputs[0];
/* 053 */     this.filter_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 054 */     filter_result = new UnsafeRow(5);
/* 055 */     this.filter_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 
128);
/* 056 */     this.filter_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
 5);
/* 057 */     project_result = new UnsafeRow(2);
/* 058 */     this.project_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 
64);
/* 059 */     this.project_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
 2);
/* 060 */     agg_result2 = new UnsafeRow(2);
/* 061 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64);
/* 062 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
/* 063 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 064 */     this.wholestagecodegen_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 065 */     this.wholestagecodegen_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 066 */     wholestagecodegen_result = new UnsafeRow(3);
/* 067 */     this.wholestagecodegen_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result,
 64);
/* 068 */     this.wholestagecodegen_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder,
 3);
/* 069 */   }
/* 070 */
/* 071 */   public class agg_VectorizedHashMap {
/* 072 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
/* 073 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
aggregateBufferBatch;
/* 074 */     private int[] buckets;
/* 075 */     private int capacity = 1 << 16;
/* 076 */     private double loadFactor = 0.5;
/* 077 */     private int numBuckets = (int) (capacity / loadFactor);
/* 078 */     private int maxSteps = 2;
/* 079 */     private int numRows = 0;
/* 080 */     private org.apache.spark.sql.types.StructType schema = new 
org.apache.spark.sql.types.StructType().add("Origin", 
org.apache.spark.sql.types.DataTypes.StringType)
/* 081 */     .add("UniqueCarrier", 
org.apache.spark.sql.types.DataTypes.StringType)
/* 082 */     .add("count", org.apache.spark.sql.types.DataTypes.LongType);
/* 083 */     private org.apache.spark.sql.types.StructType 
aggregateBufferSchema =
/* 084 */     new org.apache.spark.sql.types.StructType().add("count", 
org.apache.spark.sql.types.DataTypes.LongType);
/* 085 */
/* 086 */     public agg_VectorizedHashMap() {
/* 087 */       batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 088 */         org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 089 */       // TODO: Possibly generate this projection in HashAggregate 
directly
/* 090 */       aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 091 */         aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 092 */       for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 093 */         aggregateBufferBatch.setColumn(i, batch.column(i+2));
/* 094 */       }
/* 095 */
/* 096 */       buckets = new int[numBuckets];
/* 097 */       java.util.Arrays.fill(buckets, -1);
/* 098 */     }
/* 099 */
/* 100 */     public 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
findOrInsert(UTF8String agg_key, UTF8String agg_key1) {
/* 101 */       long h = hash(agg_key, agg_key1);
/* 102 */       int step = 0;
/* 103 */       int idx = (int) h & (numBuckets - 1);
/* 104 */       while (step < maxSteps) {
/* 105 */         // Return bucket index if it's either an empty slot or 
already contains the key
/* 106 */         if (buckets[idx] == -1) {
/* 107 */           if (numRows < capacity) {
/* 108 */             // Initialize aggregate keys
/* 109 */             batch.column(0).putByteArray(numRows, agg_key.getBytes());
/* 110 */             batch.column(1).putByteArray(numRows, 
agg_key1.getBytes());
/* 111 */
/* 112 */             agg_bufIsNull = false;
/* 113 */             agg_bufValue = 0L;
/* 114 */
/* 115 */             // Initialize aggregate values
/* 116 */
/* 117 */             if (!agg_bufIsNull) {
/* 118 */               batch.column(2).putLong(numRows, agg_bufValue);
/* 119 */             } else {
/* 120 */               batch.column(2).putNull(numRows);
/* 121 */             }
/* 122 */
/* 123 */             buckets[idx] = numRows++;
/* 124 */             batch.setNumRows(numRows);
/* 125 */             aggregateBufferBatch.setNumRows(numRows);
/* 126 */             return aggregateBufferBatch.getRow(buckets[idx]);
/* 127 */           } else {
/* 128 */             // No more space
/* 129 */             return null;
/* 130 */           }
/* 131 */         } else if (equals(idx, agg_key, agg_key1)) {
/* 132 */           return aggregateBufferBatch.getRow(buckets[idx]);
/* 133 */         }
/* 134 */         idx = (idx + 1) & (numBuckets - 1);
/* 135 */         step++;
/* 136 */       }
/* 137 */       // Didn't find it
/* 138 */       return null;
/* 139 */     }
/* 140 */
/* 141 */     private boolean equals(int idx, UTF8String agg_key, UTF8String 
agg_key1) {
/* 142 */       return 
(batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) && 
(batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1));
/* 143 */     }
/* 144 */
/* 145 */     private long hash(UTF8String agg_key, UTF8String agg_key1) {
/* 146 */       long agg_hash = 0;
/* 147 */
/* 148 */       int agg_result = 0;
/* 149 */       for (int i = 0; i < agg_key.getBytes().length; i++) {
/* 150 */         int agg_hash1 = agg_key.getBytes()[i];
/* 151 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + 
(agg_result << 6) + (agg_result >>> 2);
/* 152 */       }
/* 153 */
/* 154 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash 
<< 6) + (agg_hash >>> 2);
/* 155 */
/* 156 */       int agg_result1 = 0;
/* 157 */       for (int i = 0; i < agg_key1.getBytes().length; i++) {
/* 158 */         int agg_hash2 = agg_key1.getBytes()[i];
/* 159 */         agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 + 
(agg_result1 << 6) + (agg_result1 >>> 2);
/* 160 */       }
/* 161 */
/* 162 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash 
<< 6) + (agg_hash >>> 2);
/* 163 */
/* 164 */       return agg_hash;
/* 165 */     }
/* 166 */
/* 167 */     public 
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 168 */     rowIterator() {
/* 169 */       return batch.rowIterator();
/* 170 */     }
/* 171 */
/* 172 */     public void close() {
/* 173 */       batch.close();
/* 174 */     }
/* 175 */
/* 176 */   }
/* 177 */
/* 178 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 179 */     agg_hashMap = agg_plan.createHashMap();
/* 180 */
/* 181 */     while (scan_input.hasNext()) {
/* 182 */       InternalRow scan_row = (InternalRow) scan_input.next();
/* 183 */       scan_numOutputRows.add(1);
/* 184 */       boolean scan_isNull8 = scan_row.isNullAt(3);
/* 185 */       int scan_value8 = scan_isNull8 ? -1 : (scan_row.getInt(3));
/* 186 */
/* 187 */       if (!(!(scan_isNull8))) continue;
/* 188 */
/* 189 */       boolean filter_value3 = false;
/* 190 */       filter_value3 = scan_value8 == 0;
/* 191 */       boolean filter_value2 = false;
/* 192 */       filter_value2 = !(filter_value3);
/* 193 */       if (!filter_value2) continue;
/* 194 */       boolean scan_isNull9 = scan_row.isNullAt(4);
/* 195 */       UTF8String scan_value9 = scan_isNull9 ? null : 
(scan_row.getUTF8String(4));
/* 196 */
/* 197 */       if (!(!(scan_isNull9))) continue;
/* 198 */
/* 199 */       Object filter_obj = ((Expression) references[5]).eval(null);
/* 200 */       UTF8String filter_value10 = (UTF8String) filter_obj;
/* 201 */       boolean filter_value8 = false;
/* 202 */       filter_value8 = scan_value9.equals(filter_value10);
/* 203 */       if (!filter_value8) continue;
/* 204 */       boolean scan_isNull7 = scan_row.isNullAt(2);
/* 205 */       UTF8String scan_value7 = scan_isNull7 ? null : 
(scan_row.getUTF8String(2));
/* 206 */
/* 207 */       if (!(!(scan_isNull7))) continue;
/* 208 */
/* 209 */       Object filter_obj1 = ((Expression) references[6]).eval(null);
/* 210 */       UTF8String filter_value15 = (UTF8String) filter_obj1;
/* 211 */       boolean filter_value13 = false;
/* 212 */       filter_value13 = scan_value7.equals(filter_value15);
/* 213 */       if (!filter_value13) continue;
/* 214 */
/* 215 */       boolean scan_isNull6 = scan_row.isNullAt(1);
/* 216 */       UTF8String scan_value6 = scan_isNull6 ? null : 
(scan_row.getUTF8String(1));
/* 217 */
/* 218 */       if (!(!(scan_isNull6))) continue;
/* 219 */
/* 220 */       boolean scan_isNull5 = scan_row.isNullAt(0);
/* 221 */       UTF8String scan_value5 = scan_isNull5 ? null : 
(scan_row.getUTF8String(0));
/* 222 */
/* 223 */       if (!(!(scan_isNull5))) continue;
/* 224 */
/* 225 */       filter_numOutputRows.add(1);
/* 226 */
/* 227 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 228 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
/* 229 */
/* 230 */       if (true) {
/* 231 */         if (!false && !false) {
/* 232 */           agg_vectorizedAggBuffer = 
agg_vectorizedHashMap.findOrInsert(
/* 233 */             scan_value6, scan_value5);
/* 234 */         }
/* 235 */       }
/* 236 */
/* 237 */       if (agg_vectorizedAggBuffer == null) {
/* 238 */         // generate grouping key
/* 239 */         agg_holder.reset();
/* 240 */
/* 241 */         agg_rowWriter.write(0, scan_value6);
/* 242 */
/* 243 */         agg_rowWriter.write(1, scan_value5);
/* 244 */         agg_result2.setTotalSize(agg_holder.totalSize());
/* 245 */         int agg_value6 = 42;
/* 246 */
/* 247 */         if (!false) {
/* 248 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value6.getBaseObject(),
 scan_value6.getBaseOffset(), scan_value6.numBytes(), agg_value6);
/* 249 */         }
/* 250 */
/* 251 */         if (!false) {
/* 252 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value5.getBaseObject(),
 scan_value5.getBaseOffset(), scan_value5.numBytes(), agg_value6);
/* 253 */         }
/* 254 */         if (true) {
/* 255 */           // try to get the buffer from hash map
/* 256 */           agg_unsafeRowAggBuffer =
/* 257 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, 
agg_value6);
/* 258 */         }
/* 259 */         if (agg_unsafeRowAggBuffer == null) {
/* 260 */           if (agg_sorter == null) {
/* 261 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
/* 262 */           } else {
/* 263 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 264 */           }
/* 265 */
/* 266 */           // the hash map had be spilled, it should have enough 
memory now,
/* 267 */           // try  to allocate buffer again.
/* 268 */           agg_unsafeRowAggBuffer =
/* 269 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, 
agg_value6);
/* 270 */           if (agg_unsafeRowAggBuffer == null) {
/* 271 */             // failed to allocate the first page
/* 272 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
/* 273 */           }
/* 274 */         }
/* 275 */       }
/* 276 */
/* 277 */       if (agg_vectorizedAggBuffer != null) {
/* 278 */         // update vectorized row
/* 279 */
/* 280 */         // common sub-expressions
/* 281 */
/* 282 */         // evaluate aggregate function
/* 283 */         long agg_value10 = agg_vectorizedAggBuffer.getLong(0);
/* 284 */
/* 285 */         long agg_value9 = -1L;
/* 286 */         agg_value9 = agg_value10 + 1L;
/* 287 */         // update vectorized row
/* 288 */         agg_vectorizedAggBuffer.setLong(0, agg_value9);
/* 289 */
/* 290 */       } else {
/* 291 */         // update unsafe row
/* 292 */
/* 293 */         // common sub-expressions
/* 294 */
/* 295 */         // evaluate aggregate function
/* 296 */         long agg_value13 = agg_unsafeRowAggBuffer.getLong(0);
/* 297 */
/* 298 */         long agg_value12 = -1L;
/* 299 */         agg_value12 = agg_value13 + 1L;
/* 300 */         // update unsafe row buffer
/* 301 */         agg_unsafeRowAggBuffer.setLong(0, agg_value12);
/* 302 */
/* 303 */       }
/* 304 */       if (shouldStop()) return;
/* 305 */     }
/* 306 */
/* 307 */     agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator();
/* 308 */
/* 309 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, 
agg_peakMemory, agg_spillSize);
/* 310 */   }
/* 311 */
/* 312 */   protected void processNext() throws java.io.IOException {
/* 313 */     if (!agg_initAgg) {
/* 314 */       agg_initAgg = true;
/* 315 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 316 */       agg_doAggregateWithKeys();
/* 317 */       wholestagecodegen_aggTime.add((System.nanoTime() - 
wholestagecodegen_beforeAgg) / 1000000);
/* 318 */     }
/* 319 */
/* 320 */     // output the result
/* 321 */
/* 322 */     while (agg_vectorizedHashMapIter.hasNext()) {
/* 323 */       wholestagecodegen_numOutputRows.add(1);
/* 324 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
wholestagecodegen_vectorizedHashMapRow =
/* 325 */       (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
/* 326 */       agg_vectorizedHashMapIter.next();
/* 327 */
/* 328 */       wholestagecodegen_holder.reset();
/* 329 */
/* 330 */       wholestagecodegen_rowWriter.zeroOutNullBytes();
/* 331 */
/* 332 */       boolean wholestagecodegen_isNull = 
wholestagecodegen_vectorizedHashMapRow.isNullAt(0);
/* 333 */       UTF8String wholestagecodegen_value = wholestagecodegen_isNull ? 
null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0));
/* 334 */       if (wholestagecodegen_isNull) {
/* 335 */         wholestagecodegen_rowWriter.setNullAt(0);
/* 336 */       } else {
/* 337 */         wholestagecodegen_rowWriter.write(0, wholestagecodegen_value);
/* 338 */       }
/* 339 */
/* 340 */       boolean wholestagecodegen_isNull1 = 
wholestagecodegen_vectorizedHashMapRow.isNullAt(1);
/* 341 */       UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1 
? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1));
/* 342 */       if (wholestagecodegen_isNull1) {
/* 343 */         wholestagecodegen_rowWriter.setNullAt(1);
/* 344 */       } else {
/* 345 */         wholestagecodegen_rowWriter.write(1, 
wholestagecodegen_value1);
/* 346 */       }
/* 347 */
/* 348 */       long wholestagecodegen_value2 = 
wholestagecodegen_vectorizedHashMapRow.getLong(2);
/* 349 */       wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2);
/* 350 */       
wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize());
/* 351 */
/* 352 */       append(wholestagecodegen_result);
/* 353 */
/* 354 */       if (shouldStop()) return;
/* 355 */     }
/* 356 */
/* 357 */     agg_vectorizedHashMap.close();
/* 358 */
/* 359 */     while (agg_mapIter.next()) {
/* 360 */       wholestagecodegen_numOutputRows.add(1);
/* 361 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 362 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 363 */
/* 364 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, 
agg_aggBuffer);
/* 365 */
/* 366 */       append(agg_resultRow);
/* 367 */
/* 368 */       if (shouldStop()) return;
/* 369 */     }
/* 370 */
/* 371 */     agg_mapIter.close();
/* 372 */     if (agg_sorter == null) {
/* 373 */       agg_hashMap.free();
/* 374 */     }
/* 375 */   }
/* 376 */ }

== Subtree 2 / 5 ==
*Project [Origin#16, UniqueCarrier#8, round((cast((count#134L * 100) as double) 
/ cast(total#97L as double)), 2) AS rank#173]
+- *SortMergeJoin [Origin#16, UniqueCarrier#8], [Origin#155, 
UniqueCarrier#147], Inner
   :- *Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0
   :  +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)], 
output=[Origin#16,UniqueCarrier#8,count#134L])
   :     +- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200)
   :        +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], 
functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L])
   :           +- *Project [UniqueCarrier#8, Origin#16]
   :              +- *Filter (((((((isnotnull(Origin#16) && 
isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) && 
isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) && 
(CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD))
   :                 +- *Scan csv 
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: 
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), 
IsNotNull(CancellationCode), ..., ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>
   +- *Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0
      +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], 
functions=[count(1)], output=[Origin#155,UniqueCarrier#147,total#97L])
         +- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200)
            +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], 
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
               +- *Project [UniqueCarrier#147, Origin#155]
                  +- *Filter (((isnotnull(UniqueCarrier#147) && 
isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD))
                     +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] 
Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), 
EqualTo(Dest,ORD)], ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator smj_leftInput;
/* 008 */   private scala.collection.Iterator smj_rightInput;
/* 009 */   private InternalRow smj_leftRow;
/* 010 */   private InternalRow smj_rightRow;
/* 011 */   private UTF8String smj_value4;
/* 012 */   private UTF8String smj_value5;
/* 013 */   private java.util.ArrayList smj_matches;
/* 014 */   private UTF8String smj_value6;
/* 015 */   private UTF8String smj_value7;
/* 016 */   private UTF8String smj_value8;
/* 017 */   private boolean smj_isNull4;
/* 018 */   private UTF8String smj_value9;
/* 019 */   private boolean smj_isNull5;
/* 020 */   private long smj_value10;
/* 021 */   private org.apache.spark.sql.execution.metric.SQLMetric 
smj_numOutputRows;
/* 022 */   private UnsafeRow smj_result;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 025 */   private UnsafeRow project_result;
/* 026 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 027 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
project_rowWriter;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 034 */     partitionIndex = index;
/* 035 */     smj_leftInput = inputs[0];
/* 036 */     smj_rightInput = inputs[1];
/* 037 */
/* 038 */     smj_rightRow = null;
/* 039 */
/* 040 */     smj_matches = new java.util.ArrayList();
/* 041 */
/* 042 */     this.smj_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 043 */     smj_result = new UnsafeRow(6);
/* 044 */     this.smj_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 128);
/* 045 */     this.smj_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 
6);
/* 046 */     project_result = new UnsafeRow(3);
/* 047 */     this.project_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 
64);
/* 048 */     this.project_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
 3);
/* 049 */   }
/* 050 */
/* 051 */   private boolean findNextInnerJoinRows(
/* 052 */     scala.collection.Iterator leftIter,
/* 053 */     scala.collection.Iterator rightIter) {
/* 054 */     smj_leftRow = null;
/* 055 */     int comp = 0;
/* 056 */     while (smj_leftRow == null) {
/* 057 */       if (!leftIter.hasNext()) return false;
/* 058 */       smj_leftRow = (InternalRow) leftIter.next();
/* 059 */
/* 060 */       boolean smj_isNull = smj_leftRow.isNullAt(0);
/* 061 */       UTF8String smj_value = smj_isNull ? null : 
(smj_leftRow.getUTF8String(0));
/* 062 */
/* 063 */       boolean smj_isNull1 = smj_leftRow.isNullAt(1);
/* 064 */       UTF8String smj_value1 = smj_isNull1 ? null : 
(smj_leftRow.getUTF8String(1));
/* 065 */       if (smj_isNull || smj_isNull1) {
/* 066 */         smj_leftRow = null;
/* 067 */         continue;
/* 068 */       }
/* 069 */       if (!smj_matches.isEmpty()) {
/* 070 */         comp = 0;
/* 071 */         if (comp == 0) {
/* 072 */           comp = smj_value.compare(smj_value6);
/* 073 */         }
/* 074 */         if (comp == 0) {
/* 075 */           comp = smj_value1.compare(smj_value7);
/* 076 */         }
/* 077 */
/* 078 */         if (comp == 0) {
/* 079 */           return true;
/* 080 */         }
/* 081 */         smj_matches.clear();
/* 082 */       }
/* 083 */
/* 084 */       do {
/* 085 */         if (smj_rightRow == null) {
/* 086 */           if (!rightIter.hasNext()) {
/* 087 */             smj_value6 = smj_value;
/* 088 */
/* 089 */             smj_value7 = smj_value1;
/* 090 */
/* 091 */             return !smj_matches.isEmpty();
/* 092 */           }
/* 093 */           smj_rightRow = (InternalRow) rightIter.next();
/* 094 */
/* 095 */           boolean smj_isNull2 = smj_rightRow.isNullAt(0);
/* 096 */           UTF8String smj_value2 = smj_isNull2 ? null : 
(smj_rightRow.getUTF8String(0));
/* 097 */
/* 098 */           boolean smj_isNull3 = smj_rightRow.isNullAt(1);
/* 099 */           UTF8String smj_value3 = smj_isNull3 ? null : 
(smj_rightRow.getUTF8String(1));
/* 100 */           if (smj_isNull2 || smj_isNull3) {
/* 101 */             smj_rightRow = null;
/* 102 */             continue;
/* 103 */           }
/* 104 */
/* 105 */           smj_value4 = smj_value2;
/* 106 */
/* 107 */           smj_value5 = smj_value3;
/* 108 */
/* 109 */         }
/* 110 */
/* 111 */         comp = 0;
/* 112 */         if (comp == 0) {
/* 113 */           comp = smj_value.compare(smj_value4);
/* 114 */         }
/* 115 */         if (comp == 0) {
/* 116 */           comp = smj_value1.compare(smj_value5);
/* 117 */         }
/* 118 */
/* 119 */         if (comp > 0) {
/* 120 */           smj_rightRow = null;
/* 121 */         } else if (comp < 0) {
/* 122 */           if (!smj_matches.isEmpty()) {
/* 123 */             smj_value6 = smj_value;
/* 124 */
/* 125 */             smj_value7 = smj_value1;
/* 126 */
/* 127 */             return true;
/* 128 */           }
/* 129 */           smj_leftRow = null;
/* 130 */         } else {
/* 131 */           smj_matches.add(smj_rightRow.copy());
/* 132 */           smj_rightRow = null;;
/* 133 */         }
/* 134 */       } while (smj_leftRow != null);
/* 135 */     }
/* 136 */     return false; // unreachable
/* 137 */   }
/* 138 */
/* 139 */   protected void processNext() throws java.io.IOException {
/* 140 */     while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 141 */       int smj_size = smj_matches.size();
/* 142 */       smj_isNull4 = smj_leftRow.isNullAt(0);
/* 143 */       smj_value8 = smj_isNull4 ? null : 
(smj_leftRow.getUTF8String(0));
/* 144 */       smj_isNull5 = smj_leftRow.isNullAt(1);
/* 145 */       smj_value9 = smj_isNull5 ? null : 
(smj_leftRow.getUTF8String(1));
/* 146 */       smj_value10 = smj_leftRow.getLong(2);
/* 147 */       for (int smj_i = 0; smj_i < smj_size; smj_i ++) {
/* 148 */         InternalRow smj_rightRow1 = (InternalRow) 
smj_matches.get(smj_i);
/* 149 */
/* 150 */         smj_numOutputRows.add(1);
/* 151 */
/* 152 */         long smj_value13 = smj_rightRow1.getLong(2);
/* 153 */         boolean project_isNull8 = false;
/* 154 */         double project_value8 = -1.0;
/* 155 */         if (!false) {
/* 156 */           project_value8 = (double) smj_value13;
/* 157 */         }
/* 158 */         boolean project_isNull3 = false;
/* 159 */         double project_value3 = -1.0;
/* 160 */         if (project_value8 == 0) {
/* 161 */           project_isNull3 = true;
/* 162 */         } else {
/* 163 */           long project_value5 = -1L;
/* 164 */           project_value5 = smj_value10 * 100L;
/* 165 */           boolean project_isNull4 = false;
/* 166 */           double project_value4 = -1.0;
/* 167 */           if (!false) {
/* 168 */             project_value4 = (double) project_value5;
/* 169 */           }
/* 170 */           project_value3 = (double)(project_value4 / project_value8);
/* 171 */         }
/* 172 */         boolean project_isNull2 = project_isNull3;
/* 173 */         double project_value2 = -1.0;
/* 174 */         if (!project_isNull2) {
/* 175 */           if (Double.isNaN(project_value3) || 
Double.isInfinite(project_value3)) {
/* 176 */             project_value2 = project_value3;
/* 177 */           } else {
/* 178 */             project_value2 = 
java.math.BigDecimal.valueOf(project_value3).
/* 179 */             setScale(2, 
java.math.BigDecimal.ROUND_HALF_UP).doubleValue();
/* 180 */           }
/* 181 */         }
/* 182 */         project_holder.reset();
/* 183 */
/* 184 */         project_rowWriter.zeroOutNullBytes();
/* 185 */
/* 186 */         if (smj_isNull4) {
/* 187 */           project_rowWriter.setNullAt(0);
/* 188 */         } else {
/* 189 */           project_rowWriter.write(0, smj_value8);
/* 190 */         }
/* 191 */
/* 192 */         if (smj_isNull5) {
/* 193 */           project_rowWriter.setNullAt(1);
/* 194 */         } else {
/* 195 */           project_rowWriter.write(1, smj_value9);
/* 196 */         }
/* 197 */
/* 198 */         if (project_isNull2) {
/* 199 */           project_rowWriter.setNullAt(2);
/* 200 */         } else {
/* 201 */           project_rowWriter.write(2, project_value2);
/* 202 */         }
/* 203 */         project_result.setTotalSize(project_holder.totalSize());
/* 204 */         append(project_result.copy());
/* 205 */
/* 206 */       }
/* 207 */       if (shouldStop()) return;
/* 208 */     }
/* 209 */   }
/* 210 */ }

== Subtree 3 / 5 ==
*HashAggregate(key=[Origin#155,UniqueCarrier#147], 
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
+- *Project [UniqueCarrier#147, Origin#155]
   +- *Filter (((isnotnull(UniqueCarrier#147) && isnotnull(Origin#155)) && 
isnotnull(Dest#156)) && (Dest#156 = ORD))
      +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: CSV, 
InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), 
EqualTo(Dest,ORD)], ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private boolean agg_bufIsNull;
/* 009 */   private long agg_bufValue;
/* 010 */   private agg_VectorizedHashMap agg_vectorizedHashMap;
/* 011 */   private 
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> 
agg_vectorizedHashMapIter;
/* 012 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 013 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 015 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
/* 019 */   private scala.collection.Iterator scan_input;
/* 020 */   private org.apache.spark.sql.execution.metric.SQLMetric 
filter_numOutputRows;
/* 021 */   private UnsafeRow filter_result;
/* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
filter_rowWriter;
/* 024 */   private UnsafeRow project_result;
/* 025 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 026 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
project_rowWriter;
/* 027 */   private UnsafeRow agg_result2;
/* 028 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 029 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 030 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner 
agg_unsafeRowJoiner;
/* 031 */   private org.apache.spark.sql.execution.metric.SQLMetric 
wholestagecodegen_numOutputRows;
/* 032 */   private org.apache.spark.sql.execution.metric.SQLMetric 
wholestagecodegen_aggTime;
/* 033 */   private UnsafeRow wholestagecodegen_result;
/* 034 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
wholestagecodegen_holder;
/* 035 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
wholestagecodegen_rowWriter;
/* 036 */
/* 037 */   public GeneratedIterator(Object[] references) {
/* 038 */     this.references = references;
/* 039 */   }
/* 040 */
/* 041 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 042 */     partitionIndex = index;
/* 043 */     agg_initAgg = false;
/* 044 */
/* 045 */     agg_vectorizedHashMap = new agg_VectorizedHashMap();
/* 046 */
/* 047 */     this.agg_plan = 
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 048 */
/* 049 */     this.agg_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */     this.agg_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */     scan_input = inputs[0];
/* 053 */     this.filter_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 054 */     filter_result = new UnsafeRow(3);
/* 055 */     this.filter_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 
96);
/* 056 */     this.filter_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
 3);
/* 057 */     project_result = new UnsafeRow(2);
/* 058 */     this.project_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 
64);
/* 059 */     this.project_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
 2);
/* 060 */     agg_result2 = new UnsafeRow(2);
/* 061 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64);
/* 062 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
/* 063 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 064 */     this.wholestagecodegen_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 065 */     this.wholestagecodegen_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 066 */     wholestagecodegen_result = new UnsafeRow(3);
/* 067 */     this.wholestagecodegen_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result,
 64);
/* 068 */     this.wholestagecodegen_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder,
 3);
/* 069 */   }
/* 070 */
/* 071 */   public class agg_VectorizedHashMap {
/* 072 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
/* 073 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
aggregateBufferBatch;
/* 074 */     private int[] buckets;
/* 075 */     private int capacity = 1 << 16;
/* 076 */     private double loadFactor = 0.5;
/* 077 */     private int numBuckets = (int) (capacity / loadFactor);
/* 078 */     private int maxSteps = 2;
/* 079 */     private int numRows = 0;
/* 080 */     private org.apache.spark.sql.types.StructType schema = new 
org.apache.spark.sql.types.StructType().add("Origin", 
org.apache.spark.sql.types.DataTypes.StringType)
/* 081 */     .add("UniqueCarrier", 
org.apache.spark.sql.types.DataTypes.StringType)
/* 082 */     .add("count", org.apache.spark.sql.types.DataTypes.LongType);
/* 083 */     private org.apache.spark.sql.types.StructType 
aggregateBufferSchema =
/* 084 */     new org.apache.spark.sql.types.StructType().add("count", 
org.apache.spark.sql.types.DataTypes.LongType);
/* 085 */
/* 086 */     public agg_VectorizedHashMap() {
/* 087 */       batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 088 */         org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 089 */       // TODO: Possibly generate this projection in HashAggregate 
directly
/* 090 */       aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 091 */         aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 092 */       for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 093 */         aggregateBufferBatch.setColumn(i, batch.column(i+2));
/* 094 */       }
/* 095 */
/* 096 */       buckets = new int[numBuckets];
/* 097 */       java.util.Arrays.fill(buckets, -1);
/* 098 */     }
/* 099 */
/* 100 */     public 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
findOrInsert(UTF8String agg_key, UTF8String agg_key1) {
/* 101 */       long h = hash(agg_key, agg_key1);
/* 102 */       int step = 0;
/* 103 */       int idx = (int) h & (numBuckets - 1);
/* 104 */       while (step < maxSteps) {
/* 105 */         // Return bucket index if it's either an empty slot or 
already contains the key
/* 106 */         if (buckets[idx] == -1) {
/* 107 */           if (numRows < capacity) {
/* 108 */             // Initialize aggregate keys
/* 109 */             batch.column(0).putByteArray(numRows, agg_key.getBytes());
/* 110 */             batch.column(1).putByteArray(numRows, 
agg_key1.getBytes());
/* 111 */
/* 112 */             agg_bufIsNull = false;
/* 113 */             agg_bufValue = 0L;
/* 114 */
/* 115 */             // Initialize aggregate values
/* 116 */
/* 117 */             if (!agg_bufIsNull) {
/* 118 */               batch.column(2).putLong(numRows, agg_bufValue);
/* 119 */             } else {
/* 120 */               batch.column(2).putNull(numRows);
/* 121 */             }
/* 122 */
/* 123 */             buckets[idx] = numRows++;
/* 124 */             batch.setNumRows(numRows);
/* 125 */             aggregateBufferBatch.setNumRows(numRows);
/* 126 */             return aggregateBufferBatch.getRow(buckets[idx]);
/* 127 */           } else {
/* 128 */             // No more space
/* 129 */             return null;
/* 130 */           }
/* 131 */         } else if (equals(idx, agg_key, agg_key1)) {
/* 132 */           return aggregateBufferBatch.getRow(buckets[idx]);
/* 133 */         }
/* 134 */         idx = (idx + 1) & (numBuckets - 1);
/* 135 */         step++;
/* 136 */       }
/* 137 */       // Didn't find it
/* 138 */       return null;
/* 139 */     }
/* 140 */
/* 141 */     private boolean equals(int idx, UTF8String agg_key, UTF8String 
agg_key1) {
/* 142 */       return 
(batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) && 
(batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1));
/* 143 */     }
/* 144 */
/* 145 */     private long hash(UTF8String agg_key, UTF8String agg_key1) {
/* 146 */       long agg_hash = 0;
/* 147 */
/* 148 */       int agg_result = 0;
/* 149 */       for (int i = 0; i < agg_key.getBytes().length; i++) {
/* 150 */         int agg_hash1 = agg_key.getBytes()[i];
/* 151 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + 
(agg_result << 6) + (agg_result >>> 2);
/* 152 */       }
/* 153 */
/* 154 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash 
<< 6) + (agg_hash >>> 2);
/* 155 */
/* 156 */       int agg_result1 = 0;
/* 157 */       for (int i = 0; i < agg_key1.getBytes().length; i++) {
/* 158 */         int agg_hash2 = agg_key1.getBytes()[i];
/* 159 */         agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 + 
(agg_result1 << 6) + (agg_result1 >>> 2);
/* 160 */       }
/* 161 */
/* 162 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash 
<< 6) + (agg_hash >>> 2);
/* 163 */
/* 164 */       return agg_hash;
/* 165 */     }
/* 166 */
/* 167 */     public 
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 168 */     rowIterator() {
/* 169 */       return batch.rowIterator();
/* 170 */     }
/* 171 */
/* 172 */     public void close() {
/* 173 */       batch.close();
/* 174 */     }
/* 175 */
/* 176 */   }
/* 177 */
/* 178 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 179 */     agg_hashMap = agg_plan.createHashMap();
/* 180 */
/* 181 */     while (scan_input.hasNext()) {
/* 182 */       InternalRow scan_row = (InternalRow) scan_input.next();
/* 183 */       scan_numOutputRows.add(1);
/* 184 */       boolean scan_isNull5 = scan_row.isNullAt(2);
/* 185 */       UTF8String scan_value5 = scan_isNull5 ? null : 
(scan_row.getUTF8String(2));
/* 186 */
/* 187 */       if (!(!(scan_isNull5))) continue;
/* 188 */
/* 189 */       Object filter_obj = ((Expression) references[5]).eval(null);
/* 190 */       UTF8String filter_value4 = (UTF8String) filter_obj;
/* 191 */       boolean filter_value2 = false;
/* 192 */       filter_value2 = scan_value5.equals(filter_value4);
/* 193 */       if (!filter_value2) continue;
/* 194 */
/* 195 */       boolean scan_isNull3 = scan_row.isNullAt(0);
/* 196 */       UTF8String scan_value3 = scan_isNull3 ? null : 
(scan_row.getUTF8String(0));
/* 197 */
/* 198 */       if (!(!(scan_isNull3))) continue;
/* 199 */
/* 200 */       boolean scan_isNull4 = scan_row.isNullAt(1);
/* 201 */       UTF8String scan_value4 = scan_isNull4 ? null : 
(scan_row.getUTF8String(1));
/* 202 */
/* 203 */       if (!(!(scan_isNull4))) continue;
/* 204 */
/* 205 */       filter_numOutputRows.add(1);
/* 206 */
/* 207 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 208 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
/* 209 */
/* 210 */       if (true) {
/* 211 */         if (!false && !false) {
/* 212 */           agg_vectorizedAggBuffer = 
agg_vectorizedHashMap.findOrInsert(
/* 213 */             scan_value4, scan_value3);
/* 214 */         }
/* 215 */       }
/* 216 */
/* 217 */       if (agg_vectorizedAggBuffer == null) {
/* 218 */         // generate grouping key
/* 219 */         agg_holder.reset();
/* 220 */
/* 221 */         agg_rowWriter.write(0, scan_value4);
/* 222 */
/* 223 */         agg_rowWriter.write(1, scan_value3);
/* 224 */         agg_result2.setTotalSize(agg_holder.totalSize());
/* 225 */         int agg_value6 = 42;
/* 226 */
/* 227 */         if (!false) {
/* 228 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value4.getBaseObject(),
 scan_value4.getBaseOffset(), scan_value4.numBytes(), agg_value6);
/* 229 */         }
/* 230 */
/* 231 */         if (!false) {
/* 232 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value3.getBaseObject(),
 scan_value3.getBaseOffset(), scan_value3.numBytes(), agg_value6);
/* 233 */         }
/* 234 */         if (true) {
/* 235 */           // try to get the buffer from hash map
/* 236 */           agg_unsafeRowAggBuffer =
/* 237 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, 
agg_value6);
/* 238 */         }
/* 239 */         if (agg_unsafeRowAggBuffer == null) {
/* 240 */           if (agg_sorter == null) {
/* 241 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
/* 242 */           } else {
/* 243 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 244 */           }
/* 245 */
/* 246 */           // the hash map had be spilled, it should have enough 
memory now,
/* 247 */           // try  to allocate buffer again.
/* 248 */           agg_unsafeRowAggBuffer =
/* 249 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2, 
agg_value6);
/* 250 */           if (agg_unsafeRowAggBuffer == null) {
/* 251 */             // failed to allocate the first page
/* 252 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
/* 253 */           }
/* 254 */         }
/* 255 */       }
/* 256 */
/* 257 */       if (agg_vectorizedAggBuffer != null) {
/* 258 */         // update vectorized row
/* 259 */
/* 260 */         // common sub-expressions
/* 261 */
/* 262 */         // evaluate aggregate function
/* 263 */         long agg_value10 = agg_vectorizedAggBuffer.getLong(0);
/* 264 */
/* 265 */         long agg_value9 = -1L;
/* 266 */         agg_value9 = agg_value10 + 1L;
/* 267 */         // update vectorized row
/* 268 */         agg_vectorizedAggBuffer.setLong(0, agg_value9);
/* 269 */
/* 270 */       } else {
/* 271 */         // update unsafe row
/* 272 */
/* 273 */         // common sub-expressions
/* 274 */
/* 275 */         // evaluate aggregate function
/* 276 */         long agg_value13 = agg_unsafeRowAggBuffer.getLong(0);
/* 277 */
/* 278 */         long agg_value12 = -1L;
/* 279 */         agg_value12 = agg_value13 + 1L;
/* 280 */         // update unsafe row buffer
/* 281 */         agg_unsafeRowAggBuffer.setLong(0, agg_value12);
/* 282 */
/* 283 */       }
/* 284 */       if (shouldStop()) return;
/* 285 */     }
/* 286 */
/* 287 */     agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator();
/* 288 */
/* 289 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, 
agg_peakMemory, agg_spillSize);
/* 290 */   }
/* 291 */
/* 292 */   protected void processNext() throws java.io.IOException {
/* 293 */     if (!agg_initAgg) {
/* 294 */       agg_initAgg = true;
/* 295 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 296 */       agg_doAggregateWithKeys();
/* 297 */       wholestagecodegen_aggTime.add((System.nanoTime() - 
wholestagecodegen_beforeAgg) / 1000000);
/* 298 */     }
/* 299 */
/* 300 */     // output the result
/* 301 */
/* 302 */     while (agg_vectorizedHashMapIter.hasNext()) {
/* 303 */       wholestagecodegen_numOutputRows.add(1);
/* 304 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
wholestagecodegen_vectorizedHashMapRow =
/* 305 */       (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
/* 306 */       agg_vectorizedHashMapIter.next();
/* 307 */
/* 308 */       wholestagecodegen_holder.reset();
/* 309 */
/* 310 */       wholestagecodegen_rowWriter.zeroOutNullBytes();
/* 311 */
/* 312 */       boolean wholestagecodegen_isNull = 
wholestagecodegen_vectorizedHashMapRow.isNullAt(0);
/* 313 */       UTF8String wholestagecodegen_value = wholestagecodegen_isNull ? 
null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0));
/* 314 */       if (wholestagecodegen_isNull) {
/* 315 */         wholestagecodegen_rowWriter.setNullAt(0);
/* 316 */       } else {
/* 317 */         wholestagecodegen_rowWriter.write(0, wholestagecodegen_value);
/* 318 */       }
/* 319 */
/* 320 */       boolean wholestagecodegen_isNull1 = 
wholestagecodegen_vectorizedHashMapRow.isNullAt(1);
/* 321 */       UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1 
? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1));
/* 322 */       if (wholestagecodegen_isNull1) {
/* 323 */         wholestagecodegen_rowWriter.setNullAt(1);
/* 324 */       } else {
/* 325 */         wholestagecodegen_rowWriter.write(1, 
wholestagecodegen_value1);
/* 326 */       }
/* 327 */
/* 328 */       long wholestagecodegen_value2 = 
wholestagecodegen_vectorizedHashMapRow.getLong(2);
/* 329 */       wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2);
/* 330 */       
wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize());
/* 331 */
/* 332 */       append(wholestagecodegen_result);
/* 333 */
/* 334 */       if (shouldStop()) return;
/* 335 */     }
/* 336 */
/* 337 */     agg_vectorizedHashMap.close();
/* 338 */
/* 339 */     while (agg_mapIter.next()) {
/* 340 */       wholestagecodegen_numOutputRows.add(1);
/* 341 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 342 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 343 */
/* 344 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, 
agg_aggBuffer);
/* 345 */
/* 346 */       append(agg_resultRow);
/* 347 */
/* 348 */       if (shouldStop()) return;
/* 349 */     }
/* 350 */
/* 351 */     agg_mapIter.close();
/* 352 */     if (agg_sorter == null) {
/* 353 */       agg_hashMap.free();
/* 354 */     }
/* 355 */   }
/* 356 */ }

== Subtree 4 / 5 ==
*Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0
+- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[count(1)], 
output=[Origin#155,UniqueCarrier#147,total#97L])
   +- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200)
      +- *HashAggregate(key=[Origin#155,UniqueCarrier#147], 
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
         +- *Project [UniqueCarrier#147, Origin#155]
            +- *Filter (((isnotnull(UniqueCarrier#147) && 
isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD))
               +- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: 
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest), 
EqualTo(Dest,ORD)], ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean sort_needToSort;
/* 008 */   private org.apache.spark.sql.execution.SortExec sort_plan;
/* 009 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter 
sort_sorter;
/* 010 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 011 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 012 */   private boolean agg_initAgg;
/* 013 */   private boolean agg_bufIsNull;
/* 014 */   private long agg_bufValue;
/* 015 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 016 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 017 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 018 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 019 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 020 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 021 */   private scala.collection.Iterator inputadapter_input;
/* 022 */   private UnsafeRow agg_result;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 025 */   private UnsafeRow agg_result1;
/* 026 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 027 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
agg_rowWriter1;
/* 028 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_numOutputRows;
/* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_aggTime;
/* 030 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_peakMemory;
/* 031 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_spillSize;
/* 032 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_sortTime;
/* 033 */
/* 034 */   public GeneratedIterator(Object[] references) {
/* 035 */     this.references = references;
/* 036 */   }
/* 037 */
/* 038 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 039 */     partitionIndex = index;
/* 040 */     sort_needToSort = true;
/* 041 */     this.sort_plan = (org.apache.spark.sql.execution.SortExec) 
references[0];
/* 042 */     sort_sorter = sort_plan.createSorter();
/* 043 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 044 */
/* 045 */     agg_initAgg = false;
/* 046 */
/* 047 */     this.agg_plan = 
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1];
/* 048 */
/* 049 */     this.agg_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */     this.agg_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 051 */     inputadapter_input = inputs[0];
/* 052 */     agg_result = new UnsafeRow(2);
/* 053 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64);
/* 054 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
/* 055 */     agg_result1 = new UnsafeRow(3);
/* 056 */     this.agg_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64);
/* 057 */     this.agg_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 
3);
/* 058 */     this.sort_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 059 */     this.sort_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[5];
/* 060 */     this.sort_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 061 */     this.sort_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 062 */     this.sort_sortTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 063 */   }
/* 064 */
/* 065 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 066 */     agg_hashMap = agg_plan.createHashMap();
/* 067 */
/* 068 */     while (inputadapter_input.hasNext()) {
/* 069 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 070 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 071 */       UTF8String inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getUTF8String(0));
/* 072 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 073 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : 
(inputadapter_row.getUTF8String(1));
/* 074 */       long inputadapter_value2 = inputadapter_row.getLong(2);
/* 075 */
/* 076 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 077 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
/* 078 */
/* 079 */       if (agg_vectorizedAggBuffer == null) {
/* 080 */         // generate grouping key
/* 081 */         agg_holder.reset();
/* 082 */
/* 083 */         agg_rowWriter.zeroOutNullBytes();
/* 084 */
/* 085 */         if (inputadapter_isNull) {
/* 086 */           agg_rowWriter.setNullAt(0);
/* 087 */         } else {
/* 088 */           agg_rowWriter.write(0, inputadapter_value);
/* 089 */         }
/* 090 */
/* 091 */         if (inputadapter_isNull1) {
/* 092 */           agg_rowWriter.setNullAt(1);
/* 093 */         } else {
/* 094 */           agg_rowWriter.write(1, inputadapter_value1);
/* 095 */         }
/* 096 */         agg_result.setTotalSize(agg_holder.totalSize());
/* 097 */         int agg_value6 = 42;
/* 098 */
/* 099 */         if (!inputadapter_isNull) {
/* 100 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(),
 inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6);
/* 101 */         }
/* 102 */
/* 103 */         if (!inputadapter_isNull1) {
/* 104 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(),
 inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), 
agg_value6);
/* 105 */         }
/* 106 */         if (true) {
/* 107 */           // try to get the buffer from hash map
/* 108 */           agg_unsafeRowAggBuffer =
/* 109 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, 
agg_value6);
/* 110 */         }
/* 111 */         if (agg_unsafeRowAggBuffer == null) {
/* 112 */           if (agg_sorter == null) {
/* 113 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
/* 114 */           } else {
/* 115 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 116 */           }
/* 117 */
/* 118 */           // the hash map had be spilled, it should have enough 
memory now,
/* 119 */           // try  to allocate buffer again.
/* 120 */           agg_unsafeRowAggBuffer =
/* 121 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, 
agg_value6);
/* 122 */           if (agg_unsafeRowAggBuffer == null) {
/* 123 */             // failed to allocate the first page
/* 124 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
/* 125 */           }
/* 126 */         }
/* 127 */       }
/* 128 */
/* 129 */       if (agg_vectorizedAggBuffer != null) {
/* 130 */         // update vectorized row
/* 131 */
/* 132 */       } else {
/* 133 */         // update unsafe row
/* 134 */
/* 135 */         // common sub-expressions
/* 136 */
/* 137 */         // evaluate aggregate function
/* 138 */         long agg_value10 = agg_unsafeRowAggBuffer.getLong(0);
/* 139 */
/* 140 */         long agg_value9 = -1L;
/* 141 */         agg_value9 = agg_value10 + inputadapter_value2;
/* 142 */         // update unsafe row buffer
/* 143 */         agg_unsafeRowAggBuffer.setLong(0, agg_value9);
/* 144 */
/* 145 */       }
/* 146 */       if (shouldStop()) return;
/* 147 */     }
/* 148 */
/* 149 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, 
agg_peakMemory, agg_spillSize);
/* 150 */   }
/* 151 */
/* 152 */   private void sort_addToSorter() throws java.io.IOException {
/* 153 */     if (!agg_initAgg) {
/* 154 */       agg_initAgg = true;
/* 155 */       long sort_beforeAgg = System.nanoTime();
/* 156 */       agg_doAggregateWithKeys();
/* 157 */       sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 
1000000);
/* 158 */     }
/* 159 */
/* 160 */     // output the result
/* 161 */
/* 162 */     while (agg_mapIter.next()) {
/* 163 */       sort_numOutputRows.add(1);
/* 164 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 165 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 166 */
/* 167 */       boolean agg_isNull11 = agg_aggKey.isNullAt(0);
/* 168 */       UTF8String agg_value12 = agg_isNull11 ? null : 
(agg_aggKey.getUTF8String(0));
/* 169 */       boolean agg_isNull12 = agg_aggKey.isNullAt(1);
/* 170 */       UTF8String agg_value13 = agg_isNull12 ? null : 
(agg_aggKey.getUTF8String(1));
/* 171 */       long agg_value14 = agg_aggBuffer.getLong(0);
/* 172 */
/* 173 */       agg_holder1.reset();
/* 174 */
/* 175 */       agg_rowWriter1.zeroOutNullBytes();
/* 176 */
/* 177 */       if (agg_isNull11) {
/* 178 */         agg_rowWriter1.setNullAt(0);
/* 179 */       } else {
/* 180 */         agg_rowWriter1.write(0, agg_value12);
/* 181 */       }
/* 182 */
/* 183 */       if (agg_isNull12) {
/* 184 */         agg_rowWriter1.setNullAt(1);
/* 185 */       } else {
/* 186 */         agg_rowWriter1.write(1, agg_value13);
/* 187 */       }
/* 188 */
/* 189 */       agg_rowWriter1.write(2, agg_value14);
/* 190 */       agg_result1.setTotalSize(agg_holder1.totalSize());
/* 191 */       sort_sorter.insertRow((UnsafeRow)agg_result1);
/* 192 */
/* 193 */       if (shouldStop()) return;
/* 194 */     }
/* 195 */
/* 196 */     agg_mapIter.close();
/* 197 */     if (agg_sorter == null) {
/* 198 */       agg_hashMap.free();
/* 199 */     }
/* 200 */
/* 201 */   }
/* 202 */
/* 203 */   protected void processNext() throws java.io.IOException {
/* 204 */     if (sort_needToSort) {
/* 205 */       long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 206 */       sort_addToSorter();
/* 207 */       sort_sortedIter = sort_sorter.sort();
/* 208 */       sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 209 */       sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 210 */       sort_spillSize.add(sort_metrics.memoryBytesSpilled() - 
sort_spillSizeBefore);
/* 211 */       
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 212 */       sort_needToSort = false;
/* 213 */     }
/* 214 */
/* 215 */     while (sort_sortedIter.hasNext()) {
/* 216 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 217 */
/* 218 */       append(sort_outputRow);
/* 219 */
/* 220 */       if (shouldStop()) return;
/* 221 */     }
/* 222 */   }
/* 223 */ }

== Subtree 5 / 5 ==
*Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0
+- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)], 
output=[Origin#16,UniqueCarrier#8,count#134L])
   +- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200)
      +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], 
functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L])
         +- *Project [UniqueCarrier#8, Origin#16]
            +- *Filter (((((((isnotnull(Origin#16) && 
isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) && 
isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) && 
(CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD))
               +- *Scan csv 
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format: 
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters: 
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled), 
IsNotNull(CancellationCode), ..., ReadSchema: 
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean sort_needToSort;
/* 008 */   private org.apache.spark.sql.execution.SortExec sort_plan;
/* 009 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter 
sort_sorter;
/* 010 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 011 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 012 */   private boolean agg_initAgg;
/* 013 */   private boolean agg_bufIsNull;
/* 014 */   private long agg_bufValue;
/* 015 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 016 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 017 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 018 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 019 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 020 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 021 */   private scala.collection.Iterator inputadapter_input;
/* 022 */   private UnsafeRow agg_result;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 025 */   private UnsafeRow agg_result1;
/* 026 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 027 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
agg_rowWriter1;
/* 028 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_numOutputRows;
/* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_aggTime;
/* 030 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_peakMemory;
/* 031 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_spillSize;
/* 032 */   private org.apache.spark.sql.execution.metric.SQLMetric 
sort_sortTime;
/* 033 */
/* 034 */   public GeneratedIterator(Object[] references) {
/* 035 */     this.references = references;
/* 036 */   }
/* 037 */
/* 038 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 039 */     partitionIndex = index;
/* 040 */     sort_needToSort = true;
/* 041 */     this.sort_plan = (org.apache.spark.sql.execution.SortExec) 
references[0];
/* 042 */     sort_sorter = sort_plan.createSorter();
/* 043 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 044 */
/* 045 */     agg_initAgg = false;
/* 046 */
/* 047 */     this.agg_plan = 
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1];
/* 048 */
/* 049 */     this.agg_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */     this.agg_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 051 */     inputadapter_input = inputs[0];
/* 052 */     agg_result = new UnsafeRow(2);
/* 053 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64);
/* 054 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
/* 055 */     agg_result1 = new UnsafeRow(3);
/* 056 */     this.agg_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64);
/* 057 */     this.agg_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 
3);
/* 058 */     this.sort_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 059 */     this.sort_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[5];
/* 060 */     this.sort_peakMemory = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 061 */     this.sort_spillSize = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 062 */     this.sort_sortTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 063 */   }
/* 064 */
/* 065 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 066 */     agg_hashMap = agg_plan.createHashMap();
/* 067 */
/* 068 */     while (inputadapter_input.hasNext()) {
/* 069 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 070 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 071 */       UTF8String inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getUTF8String(0));
/* 072 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 073 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : 
(inputadapter_row.getUTF8String(1));
/* 074 */       long inputadapter_value2 = inputadapter_row.getLong(2);
/* 075 */
/* 076 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 077 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
/* 078 */
/* 079 */       if (agg_vectorizedAggBuffer == null) {
/* 080 */         // generate grouping key
/* 081 */         agg_holder.reset();
/* 082 */
/* 083 */         agg_rowWriter.zeroOutNullBytes();
/* 084 */
/* 085 */         if (inputadapter_isNull) {
/* 086 */           agg_rowWriter.setNullAt(0);
/* 087 */         } else {
/* 088 */           agg_rowWriter.write(0, inputadapter_value);
/* 089 */         }
/* 090 */
/* 091 */         if (inputadapter_isNull1) {
/* 092 */           agg_rowWriter.setNullAt(1);
/* 093 */         } else {
/* 094 */           agg_rowWriter.write(1, inputadapter_value1);
/* 095 */         }
/* 096 */         agg_result.setTotalSize(agg_holder.totalSize());
/* 097 */         int agg_value6 = 42;
/* 098 */
/* 099 */         if (!inputadapter_isNull) {
/* 100 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(),
 inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6);
/* 101 */         }
/* 102 */
/* 103 */         if (!inputadapter_isNull1) {
/* 104 */           agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(),
 inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), 
agg_value6);
/* 105 */         }
/* 106 */         if (true) {
/* 107 */           // try to get the buffer from hash map
/* 108 */           agg_unsafeRowAggBuffer =
/* 109 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, 
agg_value6);
/* 110 */         }
/* 111 */         if (agg_unsafeRowAggBuffer == null) {
/* 112 */           if (agg_sorter == null) {
/* 113 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
/* 114 */           } else {
/* 115 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 116 */           }
/* 117 */
/* 118 */           // the hash map had be spilled, it should have enough 
memory now,
/* 119 */           // try  to allocate buffer again.
/* 120 */           agg_unsafeRowAggBuffer =
/* 121 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, 
agg_value6);
/* 122 */           if (agg_unsafeRowAggBuffer == null) {
/* 123 */             // failed to allocate the first page
/* 124 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
/* 125 */           }
/* 126 */         }
/* 127 */       }
/* 128 */
/* 129 */       if (agg_vectorizedAggBuffer != null) {
/* 130 */         // update vectorized row
/* 131 */
/* 132 */       } else {
/* 133 */         // update unsafe row
/* 134 */
/* 135 */         // common sub-expressions
/* 136 */
/* 137 */         // evaluate aggregate function
/* 138 */         long agg_value10 = agg_unsafeRowAggBuffer.getLong(0);
/* 139 */
/* 140 */         long agg_value9 = -1L;
/* 141 */         agg_value9 = agg_value10 + inputadapter_value2;
/* 142 */         // update unsafe row buffer
/* 143 */         agg_unsafeRowAggBuffer.setLong(0, agg_value9);
/* 144 */
/* 145 */       }
/* 146 */       if (shouldStop()) return;
/* 147 */     }
/* 148 */
/* 149 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, 
agg_peakMemory, agg_spillSize);
/* 150 */   }
/* 151 */
/* 152 */   private void sort_addToSorter() throws java.io.IOException {
/* 153 */     if (!agg_initAgg) {
/* 154 */       agg_initAgg = true;
/* 155 */       long sort_beforeAgg = System.nanoTime();
/* 156 */       agg_doAggregateWithKeys();
/* 157 */       sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 
1000000);
/* 158 */     }
/* 159 */
/* 160 */     // output the result
/* 161 */
/* 162 */     while (agg_mapIter.next()) {
/* 163 */       sort_numOutputRows.add(1);
/* 164 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 165 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 166 */
/* 167 */       boolean agg_isNull11 = agg_aggKey.isNullAt(0);
/* 168 */       UTF8String agg_value12 = agg_isNull11 ? null : 
(agg_aggKey.getUTF8String(0));
/* 169 */       boolean agg_isNull12 = agg_aggKey.isNullAt(1);
/* 170 */       UTF8String agg_value13 = agg_isNull12 ? null : 
(agg_aggKey.getUTF8String(1));
/* 171 */       long agg_value14 = agg_aggBuffer.getLong(0);
/* 172 */
/* 173 */       agg_holder1.reset();
/* 174 */
/* 175 */       agg_rowWriter1.zeroOutNullBytes();
/* 176 */
/* 177 */       if (agg_isNull11) {
/* 178 */         agg_rowWriter1.setNullAt(0);
/* 179 */       } else {
/* 180 */         agg_rowWriter1.write(0, agg_value12);
/* 181 */       }
/* 182 */
/* 183 */       if (agg_isNull12) {
/* 184 */         agg_rowWriter1.setNullAt(1);
/* 185 */       } else {
/* 186 */         agg_rowWriter1.write(1, agg_value13);
/* 187 */       }
/* 188 */
/* 189 */       agg_rowWriter1.write(2, agg_value14);
/* 190 */       agg_result1.setTotalSize(agg_holder1.totalSize());
/* 191 */       sort_sorter.insertRow((UnsafeRow)agg_result1);
/* 192 */
/* 193 */       if (shouldStop()) return;
/* 194 */     }
/* 195 */
/* 196 */     agg_mapIter.close();
/* 197 */     if (agg_sorter == null) {
/* 198 */       agg_hashMap.free();
/* 199 */     }
/* 200 */
/* 201 */   }
/* 202 */
/* 203 */   protected void processNext() throws java.io.IOException {
/* 204 */     if (sort_needToSort) {
/* 205 */       long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 206 */       sort_addToSorter();
/* 207 */       sort_sortedIter = sort_sorter.sort();
/* 208 */       sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 209 */       sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 210 */       sort_spillSize.add(sort_metrics.memoryBytesSpilled() - 
sort_spillSizeBefore);
/* 211 */       
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 212 */       sort_needToSort = false;
/* 213 */     }
/* 214 */
/* 215 */     while (sort_sortedIter.hasNext()) {
/* 216 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 217 */
/* 218 */       append(sort_outputRow);
/* 219 */
/* 220 */       if (shouldStop()) return;
/* 221 */     }
/* 222 */   }
/* 223 */ }


{code}

> segmentation violation in o.a.s.unsafe.types.UTF8String 
> --------------------------------------------------------
>
>                 Key: SPARK-15822
>                 URL: https://issues.apache.org/jira/browse/SPARK-15822
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>         Environment: linux amd64
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
>            Reporter: Pete Robbins
>            Assignee: Herman van Hovell
>            Priority: Blocker
>
> Executors fail with segmentation violation while running application with
> spark.memory.offHeap.enabled true
> spark.memory.offHeap.size 512m
> Also now reproduced with 
> spark.memory.offHeap.enabled false
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400
> #
> # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14)
> # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # J 4816 C2 
> org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I
>  (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d]
> {noformat}
> We initially saw this on IBM java on PowerPC box but is recreatable on linux 
> with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the 
> same code point:
> {noformat}
> 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48)
> java.lang.NullPointerException
>       at 
> org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831)
>       at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
>       at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664)
>       at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.lang.Thread.run(Thread.java:785)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to