[jira] [Updated] (SPARK-34485) GeneratedIterator grows beyond 64 KB in Spark Structured Streaming application

2021-02-20 Thread Kim Min Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kim Min Woo updated SPARK-34485:

Description: 
I ran the following Spark Structured Streaming application, but failed with 
ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
the above error message did not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the *"Expand" Unary Logical Operator* is unusually long 
in the error messages.

Codes

 
{code:java}
  def inputSchema: StructType = StructType(
Seq(
  StructField("timestamp", LongType, nullable = false),
  StructField("missingInfo", LongType, nullable = true),
  StructField("jobId", LongType, nullable = false),
  StructField("taskId", LongType, nullable = false),
  StructField("machineId", LongType, nullable = true),
  StructField("eventType", IntegerType, nullable = false),
  StructField("userId", IntegerType, nullable = true),
  StructField("category", IntegerType, nullable = true),
  StructField("priority", IntegerType, nullable = false),
  StructField("cpu", FloatType, nullable = true),
  StructField("ram", FloatType, nullable = true),
  StructField("disk", FloatType, nullable = true),
  StructField("constraints", IntegerType, nullable = true),
)
  )
{code}
 

 
{code:java}
ss.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("enable.auto.commit", "false")
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", "1000")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" 
-> ",")).as("task_event"))
  .withColumn("event_time", (col("task_event.timestamp") / 1000 + 
startTraceTime).cast(TimestampType))
  .where("task_event.eventType == 1")
  .dropDuplicates("key")
  .withWatermark("event_time", "60 seconds")
  .groupBy(
window(col("event_time"), "60 seconds", "1 second"),
col("task_event.jobId")
  ).agg(avg("task_event.cpu").as("avgCpu"))
  .writeStream
  .format("console")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()
  .awaitTermination()
{code}
 

  was:
I ran the following Spark Structured Streaming application, but failed with 
ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
the above error message did not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the "Expand" Unary Logical Operator is unusually long 
in the error messages.

Codes

 
{code:java}
  def inputSchema: StructType = StructType(
Seq(
  StructField("timestamp", LongType, nullable = false),
  StructField("missingInfo", LongType, nullable = true),
  StructField("jobId", LongType, nullable = false),
  StructField("taskId", LongType, nullable = false),
  StructField("machineId", LongType, nullable = true),
  StructField("eventType", IntegerType, nullable = false),
  StructField("userId", IntegerType, nullable = true),
  StructField("category", IntegerType, nullable = true),
  StructField("priority", IntegerType, nullable = false),
  StructField("cpu", FloatType, nullable = 

[jira] [Updated] (SPARK-34485) GeneratedIterator grows beyond 64 KB in Spark Structured Streaming application

2021-02-20 Thread Kim Min Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kim Min Woo updated SPARK-34485:

Description: 
I ran the following Spark Structured Streaming application, but failed with 
ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increased the slideDuration(from 1 second to 2 seconds) value, 
the above error message did not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the "Expand" Unary Logical Operator is unusually long 
in the error messages.

Codes

 
{code:java}
  def inputSchema: StructType = StructType(
Seq(
  StructField("timestamp", LongType, nullable = false),
  StructField("missingInfo", LongType, nullable = true),
  StructField("jobId", LongType, nullable = false),
  StructField("taskId", LongType, nullable = false),
  StructField("machineId", LongType, nullable = true),
  StructField("eventType", IntegerType, nullable = false),
  StructField("userId", IntegerType, nullable = true),
  StructField("category", IntegerType, nullable = true),
  StructField("priority", IntegerType, nullable = false),
  StructField("cpu", FloatType, nullable = true),
  StructField("ram", FloatType, nullable = true),
  StructField("disk", FloatType, nullable = true),
  StructField("constraints", IntegerType, nullable = true),
)
  )
{code}
 

 
{code:java}
ss.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("enable.auto.commit", "false")
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", "1000")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" 
-> ",")).as("task_event"))
  .withColumn("event_time", (col("task_event.timestamp") / 1000 + 
startTraceTime).cast(TimestampType))
  .where("task_event.eventType == 1")
  .dropDuplicates("key")
  .withWatermark("event_time", "60 seconds")
  .groupBy(
window(col("event_time"), "60 seconds", "1 second"),
col("task_event.jobId")
  ).agg(avg("task_event.cpu").as("avgCpu"))
  .writeStream
  .format("console")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()
  .awaitTermination()
{code}
 

  was:
I ran the following Spark Structured Streaming application, but failed with 
ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in 
"generated.java": Code of method 
"expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3"
 grows beyond 64 KB

 
{code:java}
def window(timeColumn: Column, windowDuration: String, slideDuration: String): 
Column
{code}
 

However, if I increase the slideDuration(from 1 second to 2 seconds) value, the 
above error message does not show up.

It's a simple code, but I don't understand why the GeneratedIterator grows 
beyond 64 KB.

[With this 
link(Gist),|https://gist.github.com/c357c28138f7f57505189839ecef7fc0] it is 
possible to check the the entire error messages.

It can be observed that the "Expand" Unary Logical Operator is unusually long 
in the error messages.


Codes

 
{code:java}
  def inputSchema: StructType = StructType(
Seq(
  StructField("timestamp", LongType, nullable = false),
  StructField("missingInfo", LongType, nullable = true),
  StructField("jobId", LongType, nullable = false),
  StructField("taskId", LongType, nullable = false),
  StructField("machineId", LongType, nullable = true),
  StructField("eventType", IntegerType, nullable = false),
  StructField("userId", IntegerType, nullable = true),
  StructField("category", IntegerType, nullable = true),
  StructField("priority", IntegerType, nullable = false),
  StructField("cpu", FloatType, nullable =