[ https://issues.apache.org/jira/browse/SPARK-26206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
indraneel r updated SPARK-26206: -------------------------------- Description: Spark structured streaming with kafka integration fails in update mode with compilation exception in code generation. Here's the code that was executed: {code:java} // code placeholder override def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local[*]") .appName("SparkStreamingTest") .getOrCreate() val kafkaParams = Map[String, String]( "kafka.bootstrap.servers" -> "localhost:9092", "startingOffsets" -> "earliest", "subscribe" -> "test_events") val schema = Encoders.product[UserEvent].schema val query = spark.readStream.format("kafka") .options(kafkaParams) .load() .selectExpr("CAST(value AS STRING) as message") .select(from_json(col("message"), schema).as("json")) .select("json.*") .groupBy(window(col("event_time"), "10 minutes")) .count() .writeStream .foreachBatch { (batch: Dataset[Row], batchId: Long) => println(s"batch : ${batchId}") batch.show(false) } .outputMode("update") .start() query.awaitTermination() }{code} It succeeds for batch 0 but fails for batch 1 with following exception when more data is arrives in the stream. {code:java} 18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 355) java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 47 more 18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, localhost, executor driver): java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 47 more {code} The structure of UserEvent: {code:java} case class UserEvent( timestamp: String, cid: Option[String], uid: Option[String], sessionId: Option[String], merchantId: Option[String], event: Option[String], ip: Option[String], refUrl: Option[String], referrer: Option[String], section: Option[String], tag: Option[String], eventType: Option[String], sid: Option[String] ) {code} And here's the generated code : {code:java} /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificUnsafeRowJoiner(); /* 003 */ } /* 004 */ /* 005 */ class SpecificUnsafeRowJoiner extends org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner { /* 006 */ private byte[] buf = new byte[64]; /* 007 */ private UnsafeRow out = new UnsafeRow(2); /* 008 */ /* 009 */ /* 010 */ /* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { /* 012 */ // row1: 1 fields, 1 words in bitset /* 013 */ // row2: 1, 1 words in bitset /* 014 */ // output: 2 fields, 1 words in bitset /* 015 */ final int sizeInBytes = row1.getSizeInBytes() + row2.getSizeInBytes() - 8; /* 016 */ if (sizeInBytes > buf.length) { /* 017 */ buf = new byte[sizeInBytes]; /* 018 */ } /* 019 */ /* 020 */ final java.lang.Object obj1 = row1.getBaseObject(); /* 021 */ final long offset1 = row1.getBaseOffset(); /* 022 */ final java.lang.Object obj2 = row2.getBaseObject(); /* 023 */ final long offset2 = row2.getBaseOffset(); /* 024 */ /* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | (Platform.getLong(obj2, offset2) << 1)); /* 026 */ /* 027 */ /* 028 */ // Copy fixed length data for row1 /* 029 */ Platform.copyMemory( /* 030 */ obj1, offset1 + 8, /* 031 */ buf, 24, /* 032 */ 8); /* 033 */ /* 034 */ /* 035 */ // Copy fixed length data for row2 /* 036 */ Platform.copyMemory( /* 037 */ obj2, offset2 + 8, /* 038 */ buf, 32, /* 039 */ 8); /* 040 */ /* 041 */ /* 042 */ // Copy variable length data for row1 /* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16; /* 044 */ Platform.copyMemory( /* 045 */ obj1, offset1 + 16, /* 046 */ buf, 40, /* 047 */ numBytesVariableRow1); /* 048 */ /* 049 */ /* 050 */ // Copy variable length data for row2 /* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16; /* 052 */ Platform.copyMemory( /* 053 */ obj2, offset2 + 16, /* 054 */ buf, 40 + numBytesVariableRow1, /* 055 */ numBytesVariableRow2); /* 056 */ /* 057 */ long existingOffset; /* 058 */ /* 059 */ existingOffset = Platform.getLong(buf, 24); /* 060 */ if (existingOffset != 0) { /* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32)); /* 062 */ } /* 063 */ /* 064 */ /* 065 */ out.pointTo(buf, sizeInBytes); /* 066 */ /* 067 */ return out; /* 068 */ } /* 069 */ } {code} was: Spark structured streaming with kafka integration fails in update mode with compilation exception in code generation. Here's the code that was executed: {code:java} // code placeholder override def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local[*]") .appName("SparkStreamingTest") .getOrCreate() val kafkaParams = Map[String, String]( "kafka.bootstrap.servers" -> "localhost:9092", "startingOffsets" -> "earliest", "subscribe" -> "test_events") val schema = Encoders.product[UserEvent].schema val query = spark.readStream.format("kafka") .options(kafkaParams) .load() .selectExpr("CAST(value AS STRING) as message") .select(from_json(col("message"), schema).as("json")) .select("json.*") .groupBy(window(col("event_time"), "10 minutes")) .count() .writeStream .foreachBatch { (batch: Dataset[Row], batchId: Long) => println(s"batch : ${batchId}") batch.show(false) } .outputMode("update") .start() query.awaitTermination() }{code} It succeeds for batch 0 but fails for batch 1 with following exception when more data is arrives in the stream. {code:java} 18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 355) java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 47 more 18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, localhost, executor driver): java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor through a static import at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 47 more {code} The structure of UserEvent: {code:java} case class UserEvent( timestamp: String, cid: Option[String], uid: Option[String], sessionId: Option[String], merchantId: Option[String], platform: Option[Platform], event: Option[String], ip: Option[String], refUrl: Option[String], referrer: Option[String], section: Option[String], tag: Option[String], location: Option[Location], eventType: Option[String], sid: Option[String] ) case class Platform( browser: Option[String], deviceType: Option[String], platform: Option[String], deviceIsMobile: Option[Boolean] ) case class Location( city: Option[String], country: Option[String], latitude: Option[Double], longitude: Option[Double], timezone: Option[String] ) {code} And here's the generated code : {code:java} /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificUnsafeRowJoiner(); /* 003 */ } /* 004 */ /* 005 */ class SpecificUnsafeRowJoiner extends org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner { /* 006 */ private byte[] buf = new byte[64]; /* 007 */ private UnsafeRow out = new UnsafeRow(2); /* 008 */ /* 009 */ /* 010 */ /* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { /* 012 */ // row1: 1 fields, 1 words in bitset /* 013 */ // row2: 1, 1 words in bitset /* 014 */ // output: 2 fields, 1 words in bitset /* 015 */ final int sizeInBytes = row1.getSizeInBytes() + row2.getSizeInBytes() - 8; /* 016 */ if (sizeInBytes > buf.length) { /* 017 */ buf = new byte[sizeInBytes]; /* 018 */ } /* 019 */ /* 020 */ final java.lang.Object obj1 = row1.getBaseObject(); /* 021 */ final long offset1 = row1.getBaseOffset(); /* 022 */ final java.lang.Object obj2 = row2.getBaseObject(); /* 023 */ final long offset2 = row2.getBaseOffset(); /* 024 */ /* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | (Platform.getLong(obj2, offset2) << 1)); /* 026 */ /* 027 */ /* 028 */ // Copy fixed length data for row1 /* 029 */ Platform.copyMemory( /* 030 */ obj1, offset1 + 8, /* 031 */ buf, 24, /* 032 */ 8); /* 033 */ /* 034 */ /* 035 */ // Copy fixed length data for row2 /* 036 */ Platform.copyMemory( /* 037 */ obj2, offset2 + 8, /* 038 */ buf, 32, /* 039 */ 8); /* 040 */ /* 041 */ /* 042 */ // Copy variable length data for row1 /* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16; /* 044 */ Platform.copyMemory( /* 045 */ obj1, offset1 + 16, /* 046 */ buf, 40, /* 047 */ numBytesVariableRow1); /* 048 */ /* 049 */ /* 050 */ // Copy variable length data for row2 /* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16; /* 052 */ Platform.copyMemory( /* 053 */ obj2, offset2 + 16, /* 054 */ buf, 40 + numBytesVariableRow1, /* 055 */ numBytesVariableRow2); /* 056 */ /* 057 */ long existingOffset; /* 058 */ /* 059 */ existingOffset = Platform.getLong(buf, 24); /* 060 */ if (existingOffset != 0) { /* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32)); /* 062 */ } /* 063 */ /* 064 */ /* 065 */ out.pointTo(buf, sizeInBytes); /* 066 */ /* 067 */ return out; /* 068 */ } /* 069 */ } {code} > Spark structured streaming with kafka integration fails in update mode > ----------------------------------------------------------------------- > > Key: SPARK-26206 > URL: https://issues.apache.org/jira/browse/SPARK-26206 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Environment: Operating system : MacOS Mojave > spark version : 2.4.0 > spark-sql-kafka-0-10 : 2.4.0 > kafka version 1.1.1 > scala version : 2.12.7 > Reporter: indraneel r > Priority: Blocker > > Spark structured streaming with kafka integration fails in update mode with > compilation exception in code generation. > Here's the code that was executed: > {code:java} > // code placeholder > override def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder > .master("local[*]") > .appName("SparkStreamingTest") > .getOrCreate() > > val kafkaParams = Map[String, String]( > "kafka.bootstrap.servers" -> "localhost:9092", > "startingOffsets" -> "earliest", > "subscribe" -> "test_events") > > val schema = Encoders.product[UserEvent].schema > val query = spark.readStream.format("kafka") > .options(kafkaParams) > .load() > .selectExpr("CAST(value AS STRING) as message") > .select(from_json(col("message"), schema).as("json")) > .select("json.*") > .groupBy(window(col("event_time"), "10 minutes")) > .count() > .writeStream > .foreachBatch { (batch: Dataset[Row], batchId: Long) => > println(s"batch : ${batchId}") > batch.show(false) > } > .outputMode("update") > .start() > query.awaitTermination() > }{code} > It succeeds for batch 0 but fails for batch 1 with following exception when > more data is arrives in the stream. > {code:java} > 18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406) > at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at > org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) > at > org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) > at > org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID > 355) > java.util.concurrent.ExecutionException: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) > at > org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > at > org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) > at > org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) > at > org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: File > 'generated.java', Line 25, Column 18: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > ... 47 more > 18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, > localhost, executor driver): java.util.concurrent.ExecutionException: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) > at > org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > at > org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) > at > org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198) > at > org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176) > at > org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: File > 'generated.java', Line 25, Column 18: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 25, Column 18: A method named "putLong" is not declared in any enclosing > class nor any supertype, nor through a static import > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369) > at > org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > ... 47 more > {code} > The structure of UserEvent: > {code:java} > case class UserEvent( > timestamp: String, > cid: Option[String], > uid: Option[String], > sessionId: Option[String], > merchantId: Option[String], > event: Option[String], > ip: Option[String], > refUrl: Option[String], > referrer: Option[String], > section: Option[String], > tag: Option[String], > eventType: Option[String], > sid: Option[String] > ) > {code} > And here's the generated code : > {code:java} > /* 001 */ public java.lang.Object generate(Object[] references) { > /* 002 */ return new SpecificUnsafeRowJoiner(); > /* 003 */ } > /* 004 */ > /* 005 */ class SpecificUnsafeRowJoiner extends > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner { > /* 006 */ private byte[] buf = new byte[64]; > /* 007 */ private UnsafeRow out = new UnsafeRow(2); > /* 008 */ > /* 009 */ > /* 010 */ > /* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { > /* 012 */ // row1: 1 fields, 1 words in bitset > /* 013 */ // row2: 1, 1 words in bitset > /* 014 */ // output: 2 fields, 1 words in bitset > /* 015 */ final int sizeInBytes = row1.getSizeInBytes() + > row2.getSizeInBytes() - 8; > /* 016 */ if (sizeInBytes > buf.length) { > /* 017 */ buf = new byte[sizeInBytes]; > /* 018 */ } > /* 019 */ > /* 020 */ final java.lang.Object obj1 = row1.getBaseObject(); > /* 021 */ final long offset1 = row1.getBaseOffset(); > /* 022 */ final java.lang.Object obj2 = row2.getBaseObject(); > /* 023 */ final long offset2 = row2.getBaseOffset(); > /* 024 */ > /* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | > (Platform.getLong(obj2, offset2) << 1)); > /* 026 */ > /* 027 */ > /* 028 */ // Copy fixed length data for row1 > /* 029 */ Platform.copyMemory( > /* 030 */ obj1, offset1 + 8, > /* 031 */ buf, 24, > /* 032 */ 8); > /* 033 */ > /* 034 */ > /* 035 */ // Copy fixed length data for row2 > /* 036 */ Platform.copyMemory( > /* 037 */ obj2, offset2 + 8, > /* 038 */ buf, 32, > /* 039 */ 8); > /* 040 */ > /* 041 */ > /* 042 */ // Copy variable length data for row1 > /* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16; > /* 044 */ Platform.copyMemory( > /* 045 */ obj1, offset1 + 16, > /* 046 */ buf, 40, > /* 047 */ numBytesVariableRow1); > /* 048 */ > /* 049 */ > /* 050 */ // Copy variable length data for row2 > /* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16; > /* 052 */ Platform.copyMemory( > /* 053 */ obj2, offset2 + 16, > /* 054 */ buf, 40 + numBytesVariableRow1, > /* 055 */ numBytesVariableRow2); > /* 056 */ > /* 057 */ long existingOffset; > /* 058 */ > /* 059 */ existingOffset = Platform.getLong(buf, 24); > /* 060 */ if (existingOffset != 0) { > /* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32)); > /* 062 */ } > /* 063 */ > /* 064 */ > /* 065 */ out.pointTo(buf, sizeInBytes); > /* 066 */ > /* 067 */ return out; > /* 068 */ } > /* 069 */ } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org