Hi all:

I am having a strange issue incorporating `groupBy` statements into a
structured streaming job when trying to write to Kafka or Delta. Weirdly it
only appears to work if I write to console, or to memory...

*I'm running Spark 3.0.1 with the following dependencies:
*
io.delta:delta-core_2.12:0.7.0
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
org.apache.hadoop:hadoop-azure:3.2.1"

*Here's a example of the pyspark job I've been testing with:*

/kafka = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","kafka-broker:29092")\
        .option("subscribe","test")\
        .option("startingOffsets", "earliest")\
        .option("maxOffsetsPerTrigger", 5) \
        .load()

rawDF = kafka.selectExpr("CAST(value AS STRING)")

groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))

kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")) AS
value")

kafka_stream_output \
    .writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", "kafka-broker:29092") \
    .option("topic", "sink") \
    .option("checkpointLocation", checkpoint_location) \
    .start()/
*
If I don't have a groupBy/aggregation, it's able to stream out to Kafka
perfectly fine; but when it's included, it writes a couple of messages to
the sink then throws an abstract error:*

Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

org.apache.spark.scheduler.TaskSetManager Task 1 in stage 1.0 failed 4
times; aborting
job"[org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec]
Data source write support
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@7337dcc is
aborting."


org.apache.spark.SparkException: Writing job aborted.
        at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataS
ourceV2Exec.scala:413)
        at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToData
SourceV2Exec.scala:361)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(Write
ToDataSourceV2Exec.scala:322)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSo
urceV2Exec.scala:329)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2Command
Exec.scala:39)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:
39)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExe
c.scala:45)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
        at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
        at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(Micro
BatchExecution.scala:576)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(Micro
BatchExecution.scala:571)

*I have tried using with and without watermark, and various output modes,
all seems to result in the same error.
*
Any help would be greatly appreciated!




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to