[ https://issues.apache.org/jira/browse/SPARK-25582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Brugiere reopened SPARK-25582: ------------------------------------- > Error in Spark logs when using the org.apache.spark:spark-sql_2.11:2.2.0 Java > library > ------------------------------------------------------------------------------------- > > Key: SPARK-25582 > URL: https://issues.apache.org/jira/browse/SPARK-25582 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 2.2.0 > Reporter: Thomas Brugiere > Priority: Major > Attachments: fileA.csv, fileB.csv, fileC.csv > > > I have noticed an error that appears in the Spark logs when using the Spark > SQL library in a Java 8 project. > When I run the code below with the attached files as input, I can see the > ERROR below in the application logs. > I am using the *org.apache.spark:spark-sql_2.11:2.2.0* library in my Java > project > Note that the same logic implemented with the Python API (pyspark) doesn't > produce any Exception like this. > *Code* > {code:java} > SparkConf conf = new SparkConf().setAppName("SparkBug").setMaster("local"); > SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); > Dataset<Row> df_a = sparkSession.read().option("header", > true).csv("local/fileA.csv").dropDuplicates(); > Dataset<Row> df_b = sparkSession.read().option("header", > true).csv("local/fileB.csv").dropDuplicates(); > Dataset<Row> df_c = sparkSession.read().option("header", > true).csv("local/fileC.csv").dropDuplicates(); > String[] key_join_1 = new String[]{"colA", "colB", "colC", "colD", "colE", > "colF"}; > String[] key_join_2 = new String[]{"colA", "colB", "colC", "colD", "colE"}; > Dataset<Row> df_inventory_1 = df_a.join(df_b, arrayToSeq(key_join_1), "left"); > Dataset<Row> df_inventory_2 = df_inventory_1.join(df_c, > arrayToSeq(key_join_2), "left"); > df_inventory_2.show(); > {code} > *Error message* > {code:java} > 18/10/01 09:58:07 ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 202, Column 18: Expression "agg_isNull_28" is not an rvalue > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 202, Column 18: Expression "agg_isNull_28" is not an rvalue > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) > at > org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:7170) > at > org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5332) > at org.codehaus.janino.UnitCompiler.access$9400(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$13$1.visitAmbiguousName(UnitCompiler.java:5287) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4053) > at org.codehaus.janino.UnitCompiler$13.visitLvalue(UnitCompiler.java:5284) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:3977) > at > org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:5280) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2391) > at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466) > at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3075) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) > at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1285) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:825) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:411) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446) > at > org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) > at > org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2489) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2703) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) > at org.apache.spark.sql.Dataset.show(Dataset.scala:723) > at org.apache.spark.sql.Dataset.show(Dataset.scala:682) > at org.apache.spark.sql.Dataset.show(Dataset.scala:691) > at SparkBug.main(SparkBug.java:30) > 18/10/01 09:58:07 INFO CodeGenerator: > /* 001 */ public Object generate(Object[] references) { > /* 002 */ return new GeneratedIteratorForCodegenStage6(references); > /* 003 */ } > /* 004 */ > /* 005 */ final class GeneratedIteratorForCodegenStage6 extends > org.apache.spark.sql.execution.BufferedRowIterator { > /* 006 */ private Object[] references; > /* 007 */ private scala.collection.Iterator[] inputs; > /* 008 */ private boolean agg_initAgg_0; > /* 009 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0; > /* 010 */ private > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0; > /* 011 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter > agg_sorter_0; > /* 012 */ private scala.collection.Iterator inputadapter_input_0; > /* 013 */ private org.apache.spark.sql.execution.joins.UnsafeHashedRelation > bhj_relation_0; > /* 014 */ private org.apache.spark.sql.execution.joins.UnsafeHashedRelation > bhj_relation_1; > /* 015 */ private > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] > agg_mutableStateArray_1 = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[8]; > /* 016 */ private > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] > agg_mutableStateArray_2 = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[8]; > /* 017 */ private UnsafeRow[] agg_mutableStateArray_0 = new UnsafeRow[8]; > /* 018 */ > /* 019 */ public GeneratedIteratorForCodegenStage6(Object[] references) { > /* 020 */ this.references = references; > /* 021 */ } > /* 022 */ > /* 023 */ public void init(int index, scala.collection.Iterator[] inputs) { > /* 024 */ partitionIndex = index; > /* 025 */ this.inputs = inputs; > /* 026 */ wholestagecodegen_init_0_0(); > /* 027 */ wholestagecodegen_init_0_1(); > /* 028 */ wholestagecodegen_init_0_2(); > /* 029 */ > /* 030 */ } > /* 031 */ > /* 032 */ private void wholestagecodegen_init_0_2() { > /* 033 */ agg_mutableStateArray_0[5] = new UnsafeRow(5); > /* 034 */ agg_mutableStateArray_1[5] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[5], > 160); > /* 035 */ agg_mutableStateArray_2[5] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[5], > 5); > /* 036 */ agg_mutableStateArray_0[6] = new UnsafeRow(23); > /* 037 */ agg_mutableStateArray_1[6] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[6], > 736); > /* 038 */ agg_mutableStateArray_2[6] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[6], > 23); > /* 039 */ agg_mutableStateArray_0[7] = new UnsafeRow(18); > /* 040 */ agg_mutableStateArray_1[7] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[7], > 576); > /* 041 */ agg_mutableStateArray_2[7] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[7], > 18); > /* 042 */ > /* 043 */ } > /* 044 */ > /* 045 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow > agg_keyTerm_0, UnsafeRow agg_bufferTerm_0) > /* 046 */ throws java.io.IOException { > /* 047 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[4] /* numOutputRows */).add(1); > /* 048 */ > /* 049 */ boolean agg_isNull_22 = agg_keyTerm_0.isNullAt(1); > /* 050 */ UTF8String agg_value_22 = agg_isNull_22 ? null : > (agg_keyTerm_0.getUTF8String(1)); > /* 051 */ boolean agg_isNull_23 = agg_keyTerm_0.isNullAt(6); > /* 052 */ UTF8String agg_value_23 = agg_isNull_23 ? null : > (agg_keyTerm_0.getUTF8String(6)); > /* 053 */ boolean agg_isNull_24 = agg_keyTerm_0.isNullAt(3); > /* 054 */ UTF8String agg_value_24 = agg_isNull_24 ? null : > (agg_keyTerm_0.getUTF8String(3)); > /* 055 */ boolean agg_isNull_25 = agg_keyTerm_0.isNullAt(4); > /* 056 */ UTF8String agg_value_25 = agg_isNull_25 ? null : > (agg_keyTerm_0.getUTF8String(4)); > /* 057 */ boolean agg_isNull_26 = agg_keyTerm_0.isNullAt(2); > /* 058 */ UTF8String agg_value_26 = agg_isNull_26 ? null : > (agg_keyTerm_0.getUTF8String(2)); > /* 059 */ boolean agg_isNull_27 = agg_keyTerm_0.isNullAt(0); > /* 060 */ UTF8String agg_value_27 = agg_isNull_27 ? null : > (agg_keyTerm_0.getUTF8String(0)); > /* 061 */ > /* 062 */ // generate join key for stream side > /* 063 */ > /* 064 */ agg_mutableStateArray_1[2].reset(); > /* 065 */ > /* 066 */ agg_mutableStateArray_2[2].zeroOutNullBytes(); > /* 067 */ > /* 068 */ if (agg_isNull_22) { > /* 069 */ agg_mutableStateArray_2[2].setNullAt(0); > /* 070 */ } else { > /* 071 */ agg_mutableStateArray_2[2].write(0, agg_value_22); > /* 072 */ } > /* 073 */ > /* 074 */ if (agg_isNull_23) { > /* 075 */ agg_mutableStateArray_2[2].setNullAt(1); > /* 076 */ } else { > /* 077 */ agg_mutableStateArray_2[2].write(1, agg_value_23); > /* 078 */ } > /* 079 */ > /* 080 */ if (agg_isNull_24) { > /* 081 */ agg_mutableStateArray_2[2].setNullAt(2); > /* 082 */ } else { > /* 083 */ agg_mutableStateArray_2[2].write(2, agg_value_24); > /* 084 */ } > /* 085 */ > /* 086 */ if (agg_isNull_25) { > /* 087 */ agg_mutableStateArray_2[2].setNullAt(3); > /* 088 */ } else { > /* 089 */ agg_mutableStateArray_2[2].write(3, agg_value_25); > /* 090 */ } > /* 091 */ > /* 092 */ if (agg_isNull_26) { > /* 093 */ agg_mutableStateArray_2[2].setNullAt(4); > /* 094 */ } else { > /* 095 */ agg_mutableStateArray_2[2].write(4, agg_value_26); > /* 096 */ } > /* 097 */ > /* 098 */ if (agg_isNull_27) { > /* 099 */ agg_mutableStateArray_2[2].setNullAt(5); > /* 100 */ } else { > /* 101 */ agg_mutableStateArray_2[2].write(5, agg_value_27); > /* 102 */ } > /* 103 */ > agg_mutableStateArray_0[2].setTotalSize(agg_mutableStateArray_1[2].totalSize()); > /* 104 */ > /* 105 */ // find matches from HashedRelation > /* 106 */ UnsafeRow bhj_matched_0 = agg_mutableStateArray_0[2].anyNull() > ? null: (UnsafeRow)bhj_relation_0.getValue(agg_mutableStateArray_0[2]); > /* 107 */ final boolean bhj_conditionPassed_0 = true; > /* 108 */ if (!bhj_conditionPassed_0) { > /* 109 */ bhj_matched_0 = null; > /* 110 */ // reset the variables those are already evaluated. > /* 111 */ > /* 112 */ } > /* 113 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[7] /* numOutputRows */).add(1); > /* 114 */ > /* 115 */ // generate join key for stream side > /* 116 */ > /* 117 */ agg_mutableStateArray_1[5].reset(); > /* 118 */ > /* 119 */ agg_mutableStateArray_2[5].zeroOutNullBytes(); > /* 120 */ > /* 121 */ if (agg_isNull_22) { > /* 122 */ agg_mutableStateArray_2[5].setNullAt(0); > /* 123 */ } else { > /* 124 */ agg_mutableStateArray_2[5].write(0, agg_value_22); > /* 125 */ } > /* 126 */ > /* 127 */ if (agg_isNull_23) { > /* 128 */ agg_mutableStateArray_2[5].setNullAt(1); > /* 129 */ } else { > /* 130 */ agg_mutableStateArray_2[5].write(1, agg_value_23); > /* 131 */ } > /* 132 */ > /* 133 */ if (agg_isNull_24) { > /* 134 */ agg_mutableStateArray_2[5].setNullAt(2); > /* 135 */ } else { > /* 136 */ agg_mutableStateArray_2[5].write(2, agg_value_24); > /* 137 */ } > /* 138 */ > /* 139 */ if (agg_isNull_25) { > /* 140 */ agg_mutableStateArray_2[5].setNullAt(3); > /* 141 */ } else { > /* 142 */ agg_mutableStateArray_2[5].write(3, agg_value_25); > /* 143 */ } > /* 144 */ > /* 145 */ if (agg_isNull_26) { > /* 146 */ agg_mutableStateArray_2[5].setNullAt(4); > /* 147 */ } else { > /* 148 */ agg_mutableStateArray_2[5].write(4, agg_value_26); > /* 149 */ } > /* 150 */ > agg_mutableStateArray_0[5].setTotalSize(agg_mutableStateArray_1[5].totalSize()); > /* 151 */ > /* 152 */ // find matches from HashedRelation > /* 153 */ UnsafeRow bhj_matched_1 = agg_mutableStateArray_0[5].anyNull() > ? null: (UnsafeRow)bhj_relation_1.getValue(agg_mutableStateArray_0[5]); > /* 154 */ final boolean bhj_conditionPassed_1 = true; > /* 155 */ if (!bhj_conditionPassed_1) { > /* 156 */ bhj_matched_1 = null; > /* 157 */ // reset the variables those are already evaluated. > /* 158 */ > /* 159 */ } > /* 160 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[10] /* numOutputRows */).add(1); > /* 161 */ > /* 162 */ agg_mutableStateArray_1[7].reset(); > /* 163 */ > /* 164 */ agg_mutableStateArray_2[7].zeroOutNullBytes(); > /* 165 */ > /* 166 */ if (agg_isNull_22) { > /* 167 */ agg_mutableStateArray_2[7].setNullAt(0); > /* 168 */ } else { > /* 169 */ agg_mutableStateArray_2[7].write(0, agg_value_22); > /* 170 */ } > /* 171 */ > /* 172 */ if (agg_isNull_23) { > /* 173 */ agg_mutableStateArray_2[7].setNullAt(1); > /* 174 */ } else { > /* 175 */ agg_mutableStateArray_2[7].write(1, agg_value_23); > /* 176 */ } > /* 177 */ > /* 178 */ if (agg_isNull_24) { > /* 179 */ agg_mutableStateArray_2[7].setNullAt(2); > /* 180 */ } else { > /* 181 */ agg_mutableStateArray_2[7].write(2, agg_value_24); > /* 182 */ } > /* 183 */ > /* 184 */ if (agg_isNull_25) { > /* 185 */ agg_mutableStateArray_2[7].setNullAt(3); > /* 186 */ } else { > /* 187 */ agg_mutableStateArray_2[7].write(3, agg_value_25); > /* 188 */ } > /* 189 */ > /* 190 */ if (agg_isNull_26) { > /* 191 */ agg_mutableStateArray_2[7].setNullAt(4); > /* 192 */ } else { > /* 193 */ agg_mutableStateArray_2[7].write(4, agg_value_26); > /* 194 */ } > /* 195 */ > /* 196 */ if (agg_isNull_27) { > /* 197 */ agg_mutableStateArray_2[7].setNullAt(5); > /* 198 */ } else { > /* 199 */ agg_mutableStateArray_2[7].write(5, agg_value_27); > /* 200 */ } > /* 201 */ > /* 202 */ if (agg_isNull_28) { > /* 203 */ agg_mutableStateArray_2[7].setNullAt(6); > /* 204 */ } else { > /* 205 */ agg_mutableStateArray_2[7].write(6, agg_value_28); > /* 206 */ } > /* 207 */ > /* 208 */ if (bhj_isNull_19) { > /* 209 */ agg_mutableStateArray_2[7].setNullAt(7); > /* 210 */ } else { > /* 211 */ agg_mutableStateArray_2[7].write(7, bhj_value_19); > /* 212 */ } > /* 213 */ > /* 214 */ if (bhj_isNull_21) { > /* 215 */ agg_mutableStateArray_2[7].setNullAt(8); > /* 216 */ } else { > /* 217 */ agg_mutableStateArray_2[7].write(8, bhj_value_21); > /* 218 */ } > /* 219 */ > /* 220 */ if (bhj_isNull_23) { > /* 221 */ agg_mutableStateArray_2[7].setNullAt(9); > /* 222 */ } else { > /* 223 */ agg_mutableStateArray_2[7].write(9, bhj_value_23); > /* 224 */ } > /* 225 */ > /* 226 */ if (bhj_isNull_25) { > /* 227 */ agg_mutableStateArray_2[7].setNullAt(10); > /* 228 */ } else { > /* 229 */ agg_mutableStateArray_2[7].write(10, bhj_value_25); > /* 230 */ } > /* 231 */ > /* 232 */ if (bhj_isNull_27) { > /* 233 */ agg_mutableStateArray_2[7].setNullAt(11); > /* 234 */ } else { > /* 235 */ agg_mutableStateArray_2[7].write(11, bhj_value_27); > /* 236 */ } > /* 237 */ > /* 238 */ if (bhj_isNull_29) { > /* 239 */ agg_mutableStateArray_2[7].setNullAt(12); > /* 240 */ } else { > /* 241 */ agg_mutableStateArray_2[7].write(12, bhj_value_29); > /* 242 */ } > /* 243 */ > /* 244 */ if (bhj_isNull_31) { > /* 245 */ agg_mutableStateArray_2[7].setNullAt(13); > /* 246 */ } else { > /* 247 */ agg_mutableStateArray_2[7].write(13, bhj_value_31); > /* 248 */ } > /* 249 */ > /* 250 */ if (bhj_isNull_68) { > /* 251 */ agg_mutableStateArray_2[7].setNullAt(14); > /* 252 */ } else { > /* 253 */ agg_mutableStateArray_2[7].write(14, bhj_value_68); > /* 254 */ } > /* 255 */ > /* 256 */ if (bhj_isNull_70) { > /* 257 */ agg_mutableStateArray_2[7].setNullAt(15); > /* 258 */ } else { > /* 259 */ agg_mutableStateArray_2[7].write(15, bhj_value_70); > /* 260 */ } > /* 261 */ > /* 262 */ if (bhj_isNull_72) { > /* 263 */ agg_mutableStateArray_2[7].setNullAt(16); > /* 264 */ } else { > /* 265 */ agg_mutableStateArray_2[7].write(16, bhj_value_72); > /* 266 */ } > /* 267 */ > /* 268 */ if (bhj_isNull_74) { > /* 269 */ agg_mutableStateArray_2[7].setNullAt(17); > /* 270 */ } else { > /* 271 */ agg_mutableStateArray_2[7].write(17, bhj_value_74); > /* 272 */ } > /* 273 */ > agg_mutableStateArray_0[7].setTotalSize(agg_mutableStateArray_1[7].totalSize()); > /* 274 */ append(agg_mutableStateArray_0[7]); > /* 275 */ > /* 276 */ } > /* 277 */ > /* 278 */ private void wholestagecodegen_init_0_1() { > /* 279 */ agg_mutableStateArray_0[2] = new UnsafeRow(6); > /* 280 */ agg_mutableStateArray_1[2] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[2], > 192); > /* 281 */ agg_mutableStateArray_2[2] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[2], > 6); > /* 282 */ agg_mutableStateArray_0[3] = new UnsafeRow(20); > /* 283 */ agg_mutableStateArray_1[3] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[3], > 640); > /* 284 */ agg_mutableStateArray_2[3] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[3], > 20); > /* 285 */ agg_mutableStateArray_0[4] = new UnsafeRow(14); > /* 286 */ agg_mutableStateArray_1[4] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[4], > 448); > /* 287 */ agg_mutableStateArray_2[4] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[4], > 14); > /* 288 */ > /* 289 */ bhj_relation_1 = > ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) > ((org.apache.spark.broadcast.TorrentBroadcast) references[8] /* broadcast > */).value()).asReadOnlyCopy(); > /* 290 */ incPeakExecutionMemory(bhj_relation_1.estimatedSize()); > /* 291 */ > /* 292 */ > org.apache.spark.TaskContext$.MODULE$.get().addTaskCompletionListener(new > org.apache.spark.util.TaskCompletionListener() { > /* 293 */ @Override > /* 294 */ public void onTaskCompletion(org.apache.spark.TaskContext > context) { > /* 295 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[9] /* avgHashProbe > */).set(bhj_relation_1.getAverageProbesPerLookup()); > /* 296 */ } > /* 297 */ }); > /* 298 */ > /* 299 */ } > /* 300 */ > /* 301 */ private void agg_doConsume_0(InternalRow inputadapter_row_0, > UTF8String agg_expr_0_0, boolean agg_exprIsNull_0_0, UTF8String agg_expr_1_0, > boolean agg_exprIsNull_1_0, UTF8String agg_expr_2_0, boolean > agg_exprIsNull_2_0, UTF8String agg_expr_3_0, boolean agg_exprIsNull_3_0, > UTF8String agg_expr_4_0, boolean agg_exprIsNull_4_0, UTF8String agg_expr_5_0, > boolean agg_exprIsNull_5_0, UTF8String agg_expr_6_0, boolean > agg_exprIsNull_6_0) throws java.io.IOException { > /* 302 */ UnsafeRow agg_unsafeRowAggBuffer_0 = null; > /* 303 */ > /* 304 */ // generate grouping key > /* 305 */ agg_mutableStateArray_1[0].reset(); > /* 306 */ > /* 307 */ agg_mutableStateArray_2[0].zeroOutNullBytes(); > /* 308 */ > /* 309 */ if (agg_exprIsNull_0_0) { > /* 310 */ agg_mutableStateArray_2[0].setNullAt(0); > /* 311 */ } else { > /* 312 */ agg_mutableStateArray_2[0].write(0, agg_expr_0_0); > /* 313 */ } > /* 314 */ > /* 315 */ if (agg_exprIsNull_1_0) { > /* 316 */ agg_mutableStateArray_2[0].setNullAt(1); > /* 317 */ } else { > /* 318 */ agg_mutableStateArray_2[0].write(1, agg_expr_1_0); > /* 319 */ } > /* 320 */ > /* 321 */ if (agg_exprIsNull_2_0) { > /* 322 */ agg_mutableStateArray_2[0].setNullAt(2); > /* 323 */ } else { > /* 324 */ agg_mutableStateArray_2[0].write(2, agg_expr_2_0); > /* 325 */ } > /* 326 */ > /* 327 */ if (agg_exprIsNull_3_0) { > /* 328 */ agg_mutableStateArray_2[0].setNullAt(3); > /* 329 */ } else { > /* 330 */ agg_mutableStateArray_2[0].write(3, agg_expr_3_0); > /* 331 */ } > /* 332 */ > /* 333 */ if (agg_exprIsNull_4_0) { > /* 334 */ agg_mutableStateArray_2[0].setNullAt(4); > /* 335 */ } else { > /* 336 */ agg_mutableStateArray_2[0].write(4, agg_expr_4_0); > /* 337 */ } > /* 338 */ > /* 339 */ if (agg_exprIsNull_5_0) { > /* 340 */ agg_mutableStateArray_2[0].setNullAt(5); > /* 341 */ } else { > /* 342 */ agg_mutableStateArray_2[0].write(5, agg_expr_5_0); > /* 343 */ } > /* 344 */ > /* 345 */ if (agg_exprIsNull_6_0) { > /* 346 */ agg_mutableStateArray_2[0].setNullAt(6); > /* 347 */ } else { > /* 348 */ agg_mutableStateArray_2[0].write(6, agg_expr_6_0); > /* 349 */ } > /* 350 */ > agg_mutableStateArray_0[0].setTotalSize(agg_mutableStateArray_1[0].totalSize()); > /* 351 */ int agg_value_14 = 42; > /* 352 */ > /* 353 */ if (!agg_exprIsNull_0_0) { > /* 354 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_0_0.getBaseObject(), > agg_expr_0_0.getBaseOffset(), agg_expr_0_0.numBytes(), agg_value_14); > /* 355 */ } > /* 356 */ > /* 357 */ if (!agg_exprIsNull_1_0) { > /* 358 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1_0.getBaseObject(), > agg_expr_1_0.getBaseOffset(), agg_expr_1_0.numBytes(), agg_value_14); > /* 359 */ } > /* 360 */ > /* 361 */ if (!agg_exprIsNull_2_0) { > /* 362 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_2_0.getBaseObject(), > agg_expr_2_0.getBaseOffset(), agg_expr_2_0.numBytes(), agg_value_14); > /* 363 */ } > /* 364 */ > /* 365 */ if (!agg_exprIsNull_3_0) { > /* 366 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_3_0.getBaseObject(), > agg_expr_3_0.getBaseOffset(), agg_expr_3_0.numBytes(), agg_value_14); > /* 367 */ } > /* 368 */ > /* 369 */ if (!agg_exprIsNull_4_0) { > /* 370 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_4_0.getBaseObject(), > agg_expr_4_0.getBaseOffset(), agg_expr_4_0.numBytes(), agg_value_14); > /* 371 */ } > /* 372 */ > /* 373 */ if (!agg_exprIsNull_5_0) { > /* 374 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_5_0.getBaseObject(), > agg_expr_5_0.getBaseOffset(), agg_expr_5_0.numBytes(), agg_value_14); > /* 375 */ } > /* 376 */ > /* 377 */ if (!agg_exprIsNull_6_0) { > /* 378 */ agg_value_14 = > org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_6_0.getBaseObject(), > agg_expr_6_0.getBaseOffset(), agg_expr_6_0.numBytes(), agg_value_14); > /* 379 */ } > /* 380 */ if (true) { > /* 381 */ // try to get the buffer from hash map > /* 382 */ agg_unsafeRowAggBuffer_0 = > /* 383 */ > agg_hashMap_0.getAggregationBufferFromUnsafeRow(agg_mutableStateArray_0[0], > agg_value_14); > /* 384 */ } > /* 385 */ // Can't allocate buffer from the hash map. Spill the map and > fallback to sort-based > /* 386 */ // aggregation after processing all input rows. > /* 387 */ if (agg_unsafeRowAggBuffer_0 == null) { > /* 388 */ if (agg_sorter_0 == null) { > /* 389 */ agg_sorter_0 = > agg_hashMap_0.destructAndCreateExternalSorter(); > /* 390 */ } else { > /* 391 */ > agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter()); > /* 392 */ } > /* 393 */ > /* 394 */ // the hash map had be spilled, it should have enough memory > now, > /* 395 */ // try to allocate buffer again. > /* 396 */ agg_unsafeRowAggBuffer_0 = > agg_hashMap_0.getAggregationBufferFromUnsafeRow( > /* 397 */ agg_mutableStateArray_0[0], agg_value_14); > /* 398 */ if (agg_unsafeRowAggBuffer_0 == null) { > /* 399 */ // failed to allocate the first page > /* 400 */ throw new OutOfMemoryError("No enough memory for > aggregation"); > /* 401 */ } > /* 402 */ } > /* 403 */ > /* 404 */ // common sub-expressions > /* 405 */ > /* 406 */ // evaluate aggregate function > /* 407 */ > /* 408 */ // update unsafe row buffer > /* 409 */ > /* 410 */ } > /* 411 */ > /* 412 */ private void agg_doAggregateWithKeys_0() throws > java.io.IOException { > /* 413 */ while (inputadapter_input_0.hasNext() && !stopEarly()) { > /* 414 */ InternalRow inputadapter_row_0 = (InternalRow) > inputadapter_input_0.next(); > /* 415 */ boolean inputadapter_isNull_0 = > inputadapter_row_0.isNullAt(0); > /* 416 */ UTF8String inputadapter_value_0 = inputadapter_isNull_0 ? > null : (inputadapter_row_0.getUTF8String(0)); > /* 417 */ boolean inputadapter_isNull_1 = > inputadapter_row_0.isNullAt(1); > /* 418 */ UTF8String inputadapter_value_1 = inputadapter_isNull_1 ? > null : (inputadapter_row_0.getUTF8String(1)); > /* 419 */ boolean inputadapter_isNull_2 = > inputadapter_row_0.isNullAt(2); > /* 420 */ UTF8String inputadapter_value_2 = inputadapter_isNull_2 ? > null : (inputadapter_row_0.getUTF8String(2)); > /* 421 */ boolean inputadapter_isNull_3 = > inputadapter_row_0.isNullAt(3); > /* 422 */ UTF8String inputadapter_value_3 = inputadapter_isNull_3 ? > null : (inputadapter_row_0.getUTF8String(3)); > /* 423 */ boolean inputadapter_isNull_4 = > inputadapter_row_0.isNullAt(4); > /* 424 */ UTF8String inputadapter_value_4 = inputadapter_isNull_4 ? > null : (inputadapter_row_0.getUTF8String(4)); > /* 425 */ boolean inputadapter_isNull_5 = > inputadapter_row_0.isNullAt(5); > /* 426 */ UTF8String inputadapter_value_5 = inputadapter_isNull_5 ? > null : (inputadapter_row_0.getUTF8String(5)); > /* 427 */ boolean inputadapter_isNull_6 = > inputadapter_row_0.isNullAt(6); > /* 428 */ UTF8String inputadapter_value_6 = inputadapter_isNull_6 ? > null : (inputadapter_row_0.getUTF8String(6)); > /* 429 */ > /* 430 */ agg_doConsume_0(inputadapter_row_0, inputadapter_value_0, > inputadapter_isNull_0, inputadapter_value_1, inputadapter_isNull_1, > inputadapter_value_2, inputadapter_isNull_2, inputadapter_value_3, > inputadapter_isNull_3, inputadapter_value_4, inputadapter_isNull_4, > inputadapter_value_5, inputadapter_isNull_5, inputadapter_value_6, > inputadapter_isNull_6); > /* 431 */ if (shouldStop()) return; > /* 432 */ } > /* 433 */ > /* 434 */ agg_mapIter_0 = > ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] > /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, > ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* > peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) > references[2] /* spillSize */), > ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* > avgHashProbe */)); > /* 435 */ } > /* 436 */ > /* 437 */ protected void processNext() throws java.io.IOException { > /* 438 */ if (!agg_initAgg_0) { > /* 439 */ agg_initAgg_0 = true; > /* 440 */ long wholestagecodegen_beforeAgg_0 = System.nanoTime(); > /* 441 */ agg_doAggregateWithKeys_0(); > /* 442 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[11] /* aggTime */).add((System.nanoTime() - > wholestagecodegen_beforeAgg_0) / 1000000); > /* 443 */ } > /* 444 */ > /* 445 */ // output the result > /* 446 */ > /* 447 */ while (agg_mapIter_0.next()) { > /* 448 */ UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey(); > /* 449 */ UnsafeRow agg_aggBuffer_0 = (UnsafeRow) > agg_mapIter_0.getValue(); > /* 450 */ agg_doAggregateWithKeysOutput_0(agg_aggKey_0, > agg_aggBuffer_0); > /* 451 */ > /* 452 */ if (shouldStop()) return; > /* 453 */ } > /* 454 */ > /* 455 */ agg_mapIter_0.close(); > /* 456 */ if (agg_sorter_0 == null) { > /* 457 */ agg_hashMap_0.free(); > /* 458 */ } > /* 459 */ } > /* 460 */ > /* 461 */ private void wholestagecodegen_init_0_0() { > /* 462 */ agg_hashMap_0 = > ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] > /* plan */).createHashMap(); > /* 463 */ inputadapter_input_0 = inputs[0]; > /* 464 */ agg_mutableStateArray_0[0] = new UnsafeRow(7); > /* 465 */ agg_mutableStateArray_1[0] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[0], > 224); > /* 466 */ agg_mutableStateArray_2[0] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[0], > 7); > /* 467 */ agg_mutableStateArray_0[1] = new UnsafeRow(7); > /* 468 */ agg_mutableStateArray_1[1] = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray_0[1], > 224); > /* 469 */ agg_mutableStateArray_2[1] = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray_1[1], > 7); > /* 470 */ > /* 471 */ bhj_relation_0 = > ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) > ((org.apache.spark.broadcast.TorrentBroadcast) references[5] /* broadcast > */).value()).asReadOnlyCopy(); > /* 472 */ incPeakExecutionMemory(bhj_relation_0.estimatedSize()); > /* 473 */ > /* 474 */ > org.apache.spark.TaskContext$.MODULE$.get().addTaskCompletionListener(new > org.apache.spark.util.TaskCompletionListener() { > /* 475 */ @Override > /* 476 */ public void onTaskCompletion(org.apache.spark.TaskContext > context) { > /* 477 */ ((org.apache.spark.sql.execution.metric.SQLMetric) > references[6] /* avgHashProbe > */).set(bhj_relation_0.getAverageProbesPerLookup()); > /* 478 */ } > /* 479 */ }); > /* 480 */ > /* 481 */ } > /* 482 */ > /* 483 */ } > 18/10/01 09:58:07 WARN WholeStageCodegenExec: Whole-stage codegen disabled > for plan (id=6): > *(6) Project [colA#10, colB#11, colC#12, colD#13, colE#14, colF#15, colG#16, > colH#41, ColI#42, colJ#43, colK#44, colL#45, colM#46, colN#47, colP#77, > colQ#78, colR#79, colS#80] > +- *(6) BroadcastHashJoin [colA#10, colB#11, colC#12, colD#13, colE#14], > [colA#72, colB#73, colC#74, colD#75, colE#76], LeftOuter, BuildRight > :- *(6) Project [colA#10, colB#11, colC#12, colD#13, colE#14, colF#15, > colG#16, colH#41, ColI#42, colJ#43, colK#44, colL#45, colM#46, colN#47] > : +- *(6) BroadcastHashJoin [colA#10, colB#11, colC#12, colD#13, colE#14, > colF#15], [colA#35, colB#36, colC#37, colD#38, colE#39, colF#40], LeftOuter, > BuildRight > : :- *(6) HashAggregate(keys=[colF#15, colA#10, colE#14, colC#12, > colD#13, colG#16, colB#11], functions=[], output=[colA#10, colB#11, colC#12, > colD#13, colE#14, colF#15, colG#16]) > : : +- Exchange hashpartitioning(colF#15, colA#10, colE#14, colC#12, > colD#13, colG#16, colB#11, 200) > : : +- *(1) HashAggregate(keys=[colF#15, colA#10, colE#14, > colC#12, colD#13, colG#16, colB#11], functions=[], output=[colF#15, colA#10, > colE#14, colC#12, colD#13, colG#16, colB#11]) > : : +- *(1) FileScan csv > [colA#10,colB#11,colC#12,colD#13,colE#14,colF#15,colG#16] Batched: false, > Format: CSV, Location: > InMemoryFileIndex[file:/Users/tbrugier/projects/analytics/dev/spark-bug/local/fileA.csv], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<colA:string,colB:string,colC:string,colD:string,colE:string,colF:string,colG:string> > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > string, true], input[1, string, true], input[2, string, true], input[3, > string, true], input[4, string, true], input[5, string, true])) > : +- *(3) HashAggregate(keys=[colF#40, colL#45, colA#35, colH#41, > colE#39, colM#46, colC#37, colN#47, colD#38, colJ#43, ColI#42, colB#36, > colK#44], functions=[], output=[colA#35, colB#36, colC#37, colD#38, colE#39, > colF#40, colH#41, ColI#42, colJ#43, colK#44, colL#45, colM#46, colN#47]) > : +- Exchange hashpartitioning(colF#40, colL#45, colA#35, > colH#41, colE#39, colM#46, colC#37, colN#47, colD#38, colJ#43, ColI#42, > colB#36, colK#44, 200) > : +- *(2) HashAggregate(keys=[colF#40, colL#45, colA#35, > colH#41, colE#39, colM#46, colC#37, colN#47, colD#38, colJ#43, ColI#42, > colB#36, colK#44], functions=[], output=[colF#40, colL#45, colA#35, colH#41, > colE#39, colM#46, colC#37, colN#47, colD#38, colJ#43, ColI#42, colB#36, > colK#44]) > : +- *(2) FileScan csv > [colA#35,colB#36,colC#37,colD#38,colE#39,colF#40,colH#41,ColI#42,colJ#43,colK#44,colL#45,colM#46,colN#47] > Batched: false, Format: CSV, Location: > InMemoryFileIndex[file:/Users/tbrugier/projects/analytics/dev/spark-bug/local/fileB.csv], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<colA:string,colB:string,colC:string,colD:string,colE:string,colF:string,colH:string,ColI:s... > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true], input[1, string, true], input[2, string, true], input[3, string, > true], input[4, string, true])) > +- *(5) HashAggregate(keys=[colR#79, colA#72, colE#76, colP#77, > colC#74, colQ#78, colD#75, colS#80, colB#73], functions=[], output=[colA#72, > colB#73, colC#74, colD#75, colE#76, colP#77, colQ#78, colR#79, colS#80]) > +- Exchange hashpartitioning(colR#79, colA#72, colE#76, colP#77, > colC#74, colQ#78, colD#75, colS#80, colB#73, 200) > +- *(4) HashAggregate(keys=[colR#79, colA#72, colE#76, colP#77, > colC#74, colQ#78, colD#75, colS#80, colB#73], functions=[], output=[colR#79, > colA#72, colE#76, colP#77, colC#74, colQ#78, colD#75, colS#80, colB#73]) > +- *(4) FileScan csv > [colA#72,colB#73,colC#74,colD#75,colE#76,colP#77,colQ#78,colR#79,colS#80] > Batched: false, Format: CSV, Location: > InMemoryFileIndex[file:/Users/tbrugier/projects/analytics/dev/spark-bug/local/fileC.csv], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<colA:string,colB:string,colC:string,colD:string,colE:string,colP:string,colQ:string,colR:s... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org