[ 
https://issues.apache.org/jira/browse/SPARK-34485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kim Min Woo updated SPARK-34485:
--------------------------------
    Description: 
I ran the following Structured Streaming application, but failed with ERROR 
CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
the above error message did not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the *"Expand" Unary Logical Operator* is unusually long 
in the error messages.

Codes

 
{code:java}
  def inputSchema: StructType = StructType(
    Seq(
      StructField("timestamp", LongType, nullable = false),
      StructField("missingInfo", LongType, nullable = true),
      StructField("jobId", LongType, nullable = false),
      StructField("taskId", LongType, nullable = false),
      StructField("machineId", LongType, nullable = true),
      StructField("eventType", IntegerType, nullable = false),
      StructField("userId", IntegerType, nullable = true),
      StructField("category", IntegerType, nullable = true),
      StructField("priority", IntegerType, nullable = false),
      StructField("cpu", FloatType, nullable = true),
      StructField("ram", FloatType, nullable = true),
      StructField("disk", FloatType, nullable = true),
      StructField("constraints", IntegerType, nullable = true),
    )
  )
{code}
 

 
{code:java}
    ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("enable.auto.commit", "false")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", "1000")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" 
-> ",")).as("task_event"))
      .withColumn("event_time", (col("task_event.timestamp") / 1000 + 
startTraceTime).cast(TimestampType))
      .where("task_event.eventType == 1")
      .dropDuplicates("key")
      .withWatermark("event_time", "60 seconds")
      .groupBy(
        window(col("event_time"), "60 seconds", "1 second"),
        col("task_event.jobId")
      ).agg(avg("task_event.cpu").as("avgCpu"))
      .writeStream
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
      .awaitTermination()
{code}
 

  was:
I ran the following Spark Structured Streaming application, but failed with 
ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
the above error message did not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the *"Expand" Unary Logical Operator* is unusually long 
in the error messages.

Codes

 
{code:java}
  def inputSchema: StructType = StructType(
    Seq(
      StructField("timestamp", LongType, nullable = false),
      StructField("missingInfo", LongType, nullable = true),
      StructField("jobId", LongType, nullable = false),
      StructField("taskId", LongType, nullable = false),
      StructField("machineId", LongType, nullable = true),
      StructField("eventType", IntegerType, nullable = false),
      StructField("userId", IntegerType, nullable = true),
      StructField("category", IntegerType, nullable = true),
      StructField("priority", IntegerType, nullable = false),
      StructField("cpu", FloatType, nullable = true),
      StructField("ram", FloatType, nullable = true),
      StructField("disk", FloatType, nullable = true),
      StructField("constraints", IntegerType, nullable = true),
    )
  )
{code}
 

 
{code:java}
    ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("enable.auto.commit", "false")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", "1000")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" 
-> ",")).as("task_event"))
      .withColumn("event_time", (col("task_event.timestamp") / 1000 + 
startTraceTime).cast(TimestampType))
      .where("task_event.eventType == 1")
      .dropDuplicates("key")
      .withWatermark("event_time", "60 seconds")
      .groupBy(
        window(col("event_time"), "60 seconds", "1 second"),
        col("task_event.jobId")
      ).agg(avg("task_event.cpu").as("avgCpu"))
      .writeStream
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
      .awaitTermination()
{code}
 


> GeneratedIterator grows beyond 64 KB in Structured Streaming application
> ------------------------------------------------------------------------
>
>                 Key: SPARK-34485
>                 URL: https://issues.apache.org/jira/browse/SPARK-34485
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 3.0.0, 3.0.1
>         Environment: ||Name||Version||
> |OS|Ubuntu 18.04|
> |JAVA|11.0.9|
> |Scala|2.12.10|
> |Spark|3.0.1|
>            Reporter: Kim Min Woo
>            Priority: Minor
>
> I ran the following Structured Streaming application, but failed with ERROR 
> CodeGenerator: failed to compile: 
> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
> "generated.java": Code of method 
> "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
>  of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
>  grows beyond 64 KB
>  
> {code:java}
> def window(timeColumn: Column, windowDuration: String, slideDuration: 
> String): Column
> {code}
>  
> However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
> the above error message did not show up.
> It's a simple code, but I don't understand why the GeneratedIterator grows 
> beyond 64 KB.
> [With this 
> link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
> possible to check the the entire error messages.
> It can be observed that the *"Expand" Unary Logical Operator* is unusually 
> long in the error messages.
> Codes
>  
> {code:java}
>   def inputSchema: StructType = StructType(
>     Seq(
>       StructField("timestamp", LongType, nullable = false),
>       StructField("missingInfo", LongType, nullable = true),
>       StructField("jobId", LongType, nullable = false),
>       StructField("taskId", LongType, nullable = false),
>       StructField("machineId", LongType, nullable = true),
>       StructField("eventType", IntegerType, nullable = false),
>       StructField("userId", IntegerType, nullable = true),
>       StructField("category", IntegerType, nullable = true),
>       StructField("priority", IntegerType, nullable = false),
>       StructField("cpu", FloatType, nullable = true),
>       StructField("ram", FloatType, nullable = true),
>       StructField("disk", FloatType, nullable = true),
>       StructField("constraints", IntegerType, nullable = true),
>     )
>   )
> {code}
>  
>  
> {code:java}
>     ss.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", "localhost:9092")
>       .option("subscribe", "test")
>       .option("enable.auto.commit", "false")
>       .option("startingOffsets", "earliest")
>       .option("maxOffsetsPerTrigger", "1000")
>       .load()
>       .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>       .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" 
> -> ",")).as("task_event"))
>       .withColumn("event_time", (col("task_event.timestamp") / 1000 + 
> startTraceTime).cast(TimestampType))
>       .where("task_event.eventType == 1")
>       .dropDuplicates("key")
>       .withWatermark("event_time", "60 seconds")
>       .groupBy(
>         window(col("event_time"), "60 seconds", "1 second"),
>         col("task_event.jobId")
>       ).agg(avg("task_event.cpu").as("avgCpu"))
>       .writeStream
>       .format("console")
>       .option("checkpointLocation", "/tmp/checkpoint")
>       .start()
>       .awaitTermination()
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to