[ 
https://issues.apache.org/jira/browse/SPARK-26206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

indraneel r updated SPARK-26206:
--------------------------------
    Description: 
Spark structured streaming with kafka integration fails in update mode with 
compilation exception in code generation. 
 Here's the code that was executed:
{code:java}
// code placeholder

override def main(args: Array[String]): Unit = {
  val spark = SparkSession
    .builder
    .master("local[*]")
    .appName("SparkStreamingTest")
    .getOrCreate()
 
  val kafkaParams = Map[String, String](
                   "kafka.bootstrap.servers" -> "localhost:9092",
                   "startingOffsets" -> "earliest",
                   "subscribe" -> "test_events")
 
  val schema = Encoders.product[UserEvent].schema
  val query = spark.readStream.format("kafka")
    .options(kafkaParams)
    .load()
    .selectExpr("CAST(value AS STRING) as message")
    .select(from_json(col("message"), schema).as("json"))
    .select("json.*")
    .groupBy(window(col("event_time"), "10 minutes"))
    .count()
    .writeStream
    .foreachBatch { (batch: Dataset[Row], batchId: Long) =>
      println(s"batch : ${batchId}")
      batch.show(false)
    }
    .outputMode("update")
    .start()

    query.awaitTermination()
}{code}
It succeeds for batch 0 but fails for batch 1 with following exception when 
more data is arrives in the stream.
{code:java}
18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
    at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
    at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at 
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 355)
java.util.concurrent.ExecutionException: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 25, Column 18: A method named "putLong" is not 
declared in any enclosing class nor any supertype, nor through a static import
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at 
org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at 
org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 25, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 47 more
18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, 
localhost, executor driver): java.util.concurrent.ExecutionException: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 25, Column 18: A method named "putLong" is not 
declared in any enclosing class nor any supertype, nor through a static import
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at 
org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at 
org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 25, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 47 more

{code}
 The structure of UserEvent:
{code:java}
case class UserEvent(
 timestamp: String,
 cid: Option[String],
 uid: Option[String],
 sessionId: Option[String],
 merchantId: Option[String],
 event: Option[String],
 ip: Option[String],
 refUrl: Option[String],
 referrer: Option[String],
 section: Option[String],
 tag: Option[String],
 eventType: Option[String],
 sid: Option[String]
 )

{code}
And here's the generated code :
{code:java}
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificUnsafeRowJoiner();
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeRowJoiner extends 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner {
/* 006 */ private byte[] buf = new byte[64];
/* 007 */ private UnsafeRow out = new UnsafeRow(2);
/* 008 */
/* 009 */
/* 010 */
/* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
/* 012 */ // row1: 1 fields, 1 words in bitset
/* 013 */ // row2: 1, 1 words in bitset
/* 014 */ // output: 2 fields, 1 words in bitset
/* 015 */ final int sizeInBytes = row1.getSizeInBytes() + row2.getSizeInBytes() 
- 8;
/* 016 */ if (sizeInBytes > buf.length) {
/* 017 */ buf = new byte[sizeInBytes];
/* 018 */ }
/* 019 */
/* 020 */ final java.lang.Object obj1 = row1.getBaseObject();
/* 021 */ final long offset1 = row1.getBaseOffset();
/* 022 */ final java.lang.Object obj2 = row2.getBaseObject();
/* 023 */ final long offset2 = row2.getBaseOffset();
/* 024 */
/* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | 
(Platform.getLong(obj2, offset2) << 1));
/* 026 */
/* 027 */
/* 028 */ // Copy fixed length data for row1
/* 029 */ Platform.copyMemory(
/* 030 */ obj1, offset1 + 8,
/* 031 */ buf, 24,
/* 032 */ 8);
/* 033 */
/* 034 */
/* 035 */ // Copy fixed length data for row2
/* 036 */ Platform.copyMemory(
/* 037 */ obj2, offset2 + 8,
/* 038 */ buf, 32,
/* 039 */ 8);
/* 040 */
/* 041 */
/* 042 */ // Copy variable length data for row1
/* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16;
/* 044 */ Platform.copyMemory(
/* 045 */ obj1, offset1 + 16,
/* 046 */ buf, 40,
/* 047 */ numBytesVariableRow1);
/* 048 */
/* 049 */
/* 050 */ // Copy variable length data for row2
/* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16;
/* 052 */ Platform.copyMemory(
/* 053 */ obj2, offset2 + 16,
/* 054 */ buf, 40 + numBytesVariableRow1,
/* 055 */ numBytesVariableRow2);
/* 056 */
/* 057 */ long existingOffset;
/* 058 */
/* 059 */ existingOffset = Platform.getLong(buf, 24);
/* 060 */ if (existingOffset != 0) {
/* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32));
/* 062 */ }
/* 063 */
/* 064 */
/* 065 */ out.pointTo(buf, sizeInBytes);
/* 066 */
/* 067 */ return out;
/* 068 */ }
/* 069 */ }

{code}

  was:
Spark structured streaming with kafka integration fails in update mode with 
compilation exception in code generation. 
 Here's the code that was executed:
{code:java}
// code placeholder

override def main(args: Array[String]): Unit = {
  val spark = SparkSession
    .builder
    .master("local[*]")
    .appName("SparkStreamingTest")
    .getOrCreate()
 
  val kafkaParams = Map[String, String](
                   "kafka.bootstrap.servers" -> "localhost:9092",
                   "startingOffsets" -> "earliest",
                   "subscribe" -> "test_events")
 
  val schema = Encoders.product[UserEvent].schema
  val query = spark.readStream.format("kafka")
    .options(kafkaParams)
    .load()
    .selectExpr("CAST(value AS STRING) as message")
    .select(from_json(col("message"), schema).as("json"))
    .select("json.*")
    .groupBy(window(col("event_time"), "10 minutes"))
    .count()
    .writeStream
    .foreachBatch { (batch: Dataset[Row], batchId: Long) =>
      println(s"batch : ${batchId}")
      batch.show(false)
    }
    .outputMode("update")
    .start()

    query.awaitTermination()
}{code}
It succeeds for batch 0 but fails for batch 1 with following exception when 
more data is arrives in the stream.
{code:java}
18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
    at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
    at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at 
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at 
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 355)
java.util.concurrent.ExecutionException: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 25, Column 18: A method named "putLong" is not 
declared in any enclosing class nor any supertype, nor through a static import
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at 
org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at 
org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 25, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 47 more
18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, 
localhost, executor driver): java.util.concurrent.ExecutionException: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 25, Column 18: A method named "putLong" is not 
declared in any enclosing class nor any supertype, nor through a static import
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at 
org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at 
org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
    at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
    at 
org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
    at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 25, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, 
Column 18: A method named "putLong" is not declared in any enclosing class nor 
any supertype, nor through a static import
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
    at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 47 more

{code}
 The structure of UserEvent:
{code:java}
case class UserEvent(
 timestamp: String,
 cid: Option[String],
 uid: Option[String],
 sessionId: Option[String],
 merchantId: Option[String],
 platform: Option[Platform],
 event: Option[String],
 ip: Option[String],
 refUrl: Option[String],
 referrer: Option[String],
 section: Option[String],
 tag: Option[String],
 location: Option[Location],
 eventType: Option[String],
 sid: Option[String]
 )


case class Platform(
 browser: Option[String],
 deviceType: Option[String],
 platform: Option[String],
 deviceIsMobile: Option[Boolean]
 )

case class Location(
 city: Option[String],
 country: Option[String],
 latitude: Option[Double],
 longitude: Option[Double],
 timezone: Option[String]
 )
{code}
And here's the generated code :
{code:java}
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificUnsafeRowJoiner();
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeRowJoiner extends 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner {
/* 006 */ private byte[] buf = new byte[64];
/* 007 */ private UnsafeRow out = new UnsafeRow(2);
/* 008 */
/* 009 */
/* 010 */
/* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
/* 012 */ // row1: 1 fields, 1 words in bitset
/* 013 */ // row2: 1, 1 words in bitset
/* 014 */ // output: 2 fields, 1 words in bitset
/* 015 */ final int sizeInBytes = row1.getSizeInBytes() + row2.getSizeInBytes() 
- 8;
/* 016 */ if (sizeInBytes > buf.length) {
/* 017 */ buf = new byte[sizeInBytes];
/* 018 */ }
/* 019 */
/* 020 */ final java.lang.Object obj1 = row1.getBaseObject();
/* 021 */ final long offset1 = row1.getBaseOffset();
/* 022 */ final java.lang.Object obj2 = row2.getBaseObject();
/* 023 */ final long offset2 = row2.getBaseOffset();
/* 024 */
/* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | 
(Platform.getLong(obj2, offset2) << 1));
/* 026 */
/* 027 */
/* 028 */ // Copy fixed length data for row1
/* 029 */ Platform.copyMemory(
/* 030 */ obj1, offset1 + 8,
/* 031 */ buf, 24,
/* 032 */ 8);
/* 033 */
/* 034 */
/* 035 */ // Copy fixed length data for row2
/* 036 */ Platform.copyMemory(
/* 037 */ obj2, offset2 + 8,
/* 038 */ buf, 32,
/* 039 */ 8);
/* 040 */
/* 041 */
/* 042 */ // Copy variable length data for row1
/* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16;
/* 044 */ Platform.copyMemory(
/* 045 */ obj1, offset1 + 16,
/* 046 */ buf, 40,
/* 047 */ numBytesVariableRow1);
/* 048 */
/* 049 */
/* 050 */ // Copy variable length data for row2
/* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16;
/* 052 */ Platform.copyMemory(
/* 053 */ obj2, offset2 + 16,
/* 054 */ buf, 40 + numBytesVariableRow1,
/* 055 */ numBytesVariableRow2);
/* 056 */
/* 057 */ long existingOffset;
/* 058 */
/* 059 */ existingOffset = Platform.getLong(buf, 24);
/* 060 */ if (existingOffset != 0) {
/* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32));
/* 062 */ }
/* 063 */
/* 064 */
/* 065 */ out.pointTo(buf, sizeInBytes);
/* 066 */
/* 067 */ return out;
/* 068 */ }
/* 069 */ }

{code}


> Spark structured streaming with kafka integration fails in update mode 
> -----------------------------------------------------------------------
>
>                 Key: SPARK-26206
>                 URL: https://issues.apache.org/jira/browse/SPARK-26206
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Operating system : MacOS Mojave
>  spark version : 2.4.0
> spark-sql-kafka-0-10 : 2.4.0
>  kafka version 1.1.1
> scala version : 2.12.7
>            Reporter: indraneel r
>            Priority: Blocker
>
> Spark structured streaming with kafka integration fails in update mode with 
> compilation exception in code generation. 
>  Here's the code that was executed:
> {code:java}
> // code placeholder
> override def main(args: Array[String]): Unit = {
>   val spark = SparkSession
>     .builder
>     .master("local[*]")
>     .appName("SparkStreamingTest")
>     .getOrCreate()
>  
>   val kafkaParams = Map[String, String](
>                    "kafka.bootstrap.servers" -> "localhost:9092",
>                    "startingOffsets" -> "earliest",
>                    "subscribe" -> "test_events")
>  
>   val schema = Encoders.product[UserEvent].schema
>   val query = spark.readStream.format("kafka")
>     .options(kafkaParams)
>     .load()
>     .selectExpr("CAST(value AS STRING) as message")
>     .select(from_json(col("message"), schema).as("json"))
>     .select("json.*")
>     .groupBy(window(col("event_time"), "10 minutes"))
>     .count()
>     .writeStream
>     .foreachBatch { (batch: Dataset[Row], batchId: Long) =>
>       println(s"batch : ${batchId}")
>       batch.show(false)
>     }
>     .outputMode("update")
>     .start()
>     query.awaitTermination()
> }{code}
> It succeeds for batch 0 but fails for batch 1 with following exception when 
> more data is arrives in the stream.
> {code:java}
> 18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>     at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>     at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>     at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>     at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
>     at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
>     at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>     at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>     at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>     at 
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
>     at 
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at 
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at 
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at 
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> 18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 
> 355)
> java.util.concurrent.ExecutionException: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>     at 
> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at 
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at 
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 25, Column 18: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at 
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     ... 47 more
> 18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, 
> localhost, executor driver): java.util.concurrent.ExecutionException: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>     at 
> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>     at 
> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at 
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at 
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at 
> org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: File 
> 'generated.java', Line 25, Column 18: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 25, Column 18: A method named "putLong" is not declared in any enclosing 
> class nor any supertype, nor through a static import
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at 
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at 
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     ... 47 more
> {code}
>  The structure of UserEvent:
> {code:java}
> case class UserEvent(
>  timestamp: String,
>  cid: Option[String],
>  uid: Option[String],
>  sessionId: Option[String],
>  merchantId: Option[String],
>  event: Option[String],
>  ip: Option[String],
>  refUrl: Option[String],
>  referrer: Option[String],
>  section: Option[String],
>  tag: Option[String],
>  eventType: Option[String],
>  sid: Option[String]
>  )
> {code}
> And here's the generated code :
> {code:java}
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */ return new SpecificUnsafeRowJoiner();
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificUnsafeRowJoiner extends 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner {
> /* 006 */ private byte[] buf = new byte[64];
> /* 007 */ private UnsafeRow out = new UnsafeRow(2);
> /* 008 */
> /* 009 */
> /* 010 */
> /* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
> /* 012 */ // row1: 1 fields, 1 words in bitset
> /* 013 */ // row2: 1, 1 words in bitset
> /* 014 */ // output: 2 fields, 1 words in bitset
> /* 015 */ final int sizeInBytes = row1.getSizeInBytes() + 
> row2.getSizeInBytes() - 8;
> /* 016 */ if (sizeInBytes > buf.length) {
> /* 017 */ buf = new byte[sizeInBytes];
> /* 018 */ }
> /* 019 */
> /* 020 */ final java.lang.Object obj1 = row1.getBaseObject();
> /* 021 */ final long offset1 = row1.getBaseOffset();
> /* 022 */ final java.lang.Object obj2 = row2.getBaseObject();
> /* 023 */ final long offset2 = row2.getBaseOffset();
> /* 024 */
> /* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | 
> (Platform.getLong(obj2, offset2) << 1));
> /* 026 */
> /* 027 */
> /* 028 */ // Copy fixed length data for row1
> /* 029 */ Platform.copyMemory(
> /* 030 */ obj1, offset1 + 8,
> /* 031 */ buf, 24,
> /* 032 */ 8);
> /* 033 */
> /* 034 */
> /* 035 */ // Copy fixed length data for row2
> /* 036 */ Platform.copyMemory(
> /* 037 */ obj2, offset2 + 8,
> /* 038 */ buf, 32,
> /* 039 */ 8);
> /* 040 */
> /* 041 */
> /* 042 */ // Copy variable length data for row1
> /* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16;
> /* 044 */ Platform.copyMemory(
> /* 045 */ obj1, offset1 + 16,
> /* 046 */ buf, 40,
> /* 047 */ numBytesVariableRow1);
> /* 048 */
> /* 049 */
> /* 050 */ // Copy variable length data for row2
> /* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16;
> /* 052 */ Platform.copyMemory(
> /* 053 */ obj2, offset2 + 16,
> /* 054 */ buf, 40 + numBytesVariableRow1,
> /* 055 */ numBytesVariableRow2);
> /* 056 */
> /* 057 */ long existingOffset;
> /* 058 */
> /* 059 */ existingOffset = Platform.getLong(buf, 24);
> /* 060 */ if (existingOffset != 0) {
> /* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32));
> /* 062 */ }
> /* 063 */
> /* 064 */
> /* 065 */ out.pointTo(buf, sizeInBytes);
> /* 066 */
> /* 067 */ return out;
> /* 068 */ }
> /* 069 */ }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to