[ https://issues.apache.org/jira/browse/SPARK-34485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kim Min Woo updated SPARK-34485: -------------------------------- Description: I ran the following Spark Structured Streaming application, but failed with ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3" grows beyond 64 KB {code:java} def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column {code} However, if I increased the slideDuration(from 1 second to 2 seconds) value, the above error message did not show up. It's a simple code, but I don't understand why the GeneratedIterator grows beyond 64 KB. [With this link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is possible to check the the entire error messages. It can be observed that the "Expand" Unary Logical Operator is unusually long in the error messages. Codes {code:java} def inputSchema: StructType = StructType( Seq( StructField("timestamp", LongType, nullable = false), StructField("missingInfo", LongType, nullable = true), StructField("jobId", LongType, nullable = false), StructField("taskId", LongType, nullable = false), StructField("machineId", LongType, nullable = true), StructField("eventType", IntegerType, nullable = false), StructField("userId", IntegerType, nullable = true), StructField("category", IntegerType, nullable = true), StructField("priority", IntegerType, nullable = false), StructField("cpu", FloatType, nullable = true), StructField("ram", FloatType, nullable = true), StructField("disk", FloatType, nullable = true), StructField("constraints", IntegerType, nullable = true), ) ) {code} {code:java} ss.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("enable.auto.commit", "false") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", "1000") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" -> ",")).as("task_event")) .withColumn("event_time", (col("task_event.timestamp") / 1000 + startTraceTime).cast(TimestampType)) .where("task_event.eventType == 1") .dropDuplicates("key") .withWatermark("event_time", "60 seconds") .groupBy( window(col("event_time"), "60 seconds", "1 second"), col("task_event.jobId") ).agg(avg("task_event.cpu").as("avgCpu")) .writeStream .format("console") .option("checkpointLocation", "/tmp/checkpoint") .start() .awaitTermination() {code} was: I ran the following Spark Structured Streaming application, but failed with ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3" grows beyond 64 KB {code:java} def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column {code} However, if I increase the slideDuration(from 1 second to 2 seconds) value, the above error message does not show up. It's a simple code, but I don't understand why the GeneratedIterator grows beyond 64 KB. [With this link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is possible to check the the entire error messages. It can be observed that the "Expand" Unary Logical Operator is unusually long in the error messages. Codes {code:java} def inputSchema: StructType = StructType( Seq( StructField("timestamp", LongType, nullable = false), StructField("missingInfo", LongType, nullable = true), StructField("jobId", LongType, nullable = false), StructField("taskId", LongType, nullable = false), StructField("machineId", LongType, nullable = true), StructField("eventType", IntegerType, nullable = false), StructField("userId", IntegerType, nullable = true), StructField("category", IntegerType, nullable = true), StructField("priority", IntegerType, nullable = false), StructField("cpu", FloatType, nullable = true), StructField("ram", FloatType, nullable = true), StructField("disk", FloatType, nullable = true), StructField("constraints", IntegerType, nullable = true), ) ) {code} {code:java} ss.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("enable.auto.commit", "false") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", "1000") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" -> ",")).as("task_event")) .withColumn("event_time", (col("task_event.timestamp") / 1000 + startTraceTime).cast(TimestampType)) .where("task_event.eventType == 1") .dropDuplicates("key") .withWatermark("event_time", "60 seconds") .groupBy( window(col("event_time"), "60 seconds", "1 second"), col("task_event.jobId") ).agg(avg("task_event.cpu").as("avgCpu")) .writeStream .format("console") .option("checkpointLocation", "/tmp/checkpoint") .start() .awaitTermination() {code} > GeneratedIterator grows beyond 64 KB in Spark Structured Streaming application > ------------------------------------------------------------------------------ > > Key: SPARK-34485 > URL: https://issues.apache.org/jira/browse/SPARK-34485 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 3.0.0, 3.0.1 > Environment: ||Name||Version|| > |OS|Ubuntu 18.04| > |JAVA|11.0.9| > |Scala|2.12.10| > |Spark|3.0.1| > Reporter: Kim Min Woo > Priority: Minor > > I ran the following Spark Structured Streaming application, but failed with > ERROR CodeGenerator: failed to compile: > org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in > "generated.java": Code of method > "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V" > of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3" > grows beyond 64 KB > > {code:java} > def window(timeColumn: Column, windowDuration: String, slideDuration: > String): Column > {code} > > However, if I increased the slideDuration(from 1 second to 2 seconds) value, > the above error message did not show up. > It's a simple code, but I don't understand why the GeneratedIterator grows > beyond 64 KB. > [With this > link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is > possible to check the the entire error messages. > It can be observed that the "Expand" Unary Logical Operator is unusually long > in the error messages. > Codes > > {code:java} > def inputSchema: StructType = StructType( > Seq( > StructField("timestamp", LongType, nullable = false), > StructField("missingInfo", LongType, nullable = true), > StructField("jobId", LongType, nullable = false), > StructField("taskId", LongType, nullable = false), > StructField("machineId", LongType, nullable = true), > StructField("eventType", IntegerType, nullable = false), > StructField("userId", IntegerType, nullable = true), > StructField("category", IntegerType, nullable = true), > StructField("priority", IntegerType, nullable = false), > StructField("cpu", FloatType, nullable = true), > StructField("ram", FloatType, nullable = true), > StructField("disk", FloatType, nullable = true), > StructField("constraints", IntegerType, nullable = true), > ) > ) > {code} > > > {code:java} > ss.readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test") > .option("enable.auto.commit", "false") > .option("startingOffsets", "earliest") > .option("maxOffsetsPerTrigger", "1000") > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" > -> ",")).as("task_event")) > .withColumn("event_time", (col("task_event.timestamp") / 1000 + > startTraceTime).cast(TimestampType)) > .where("task_event.eventType == 1") > .dropDuplicates("key") > .withWatermark("event_time", "60 seconds") > .groupBy( > window(col("event_time"), "60 seconds", "1 second"), > col("task_event.jobId") > ).agg(avg("task_event.cpu").as("avgCpu")) > .writeStream > .format("console") > .option("checkpointLocation", "/tmp/checkpoint") > .start() > .awaitTermination() > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org