I must say.. *Spark has let me down in this case*. I am surprised an important issue like this hasn't been fixed in Spark 2.4.
I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at work & now because Spark 2.4 can't handle this *I've been asked to rewrite the code in Flink*. Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't have a Spark 3.0 parcel!!!! So we can't upgrade to 3.0. So sad. Let me ask one more time. *Is there no way to fix this in Spark 2.4?* On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes <mailinglist...@gmail.com> wrote: > BTW, we are seeing this message as well: > *"org.apache.kafka.common.KafkaException: > Producer** closed while send in progress"*. I am assuming this happens > because of the previous issue.."producer has been closed", right? Or are > they unrelated? Please advise. Thanks. > > On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Thanks for the reply. We are on Spark 2.4. Is there no way to get this >> fixed in Spark 2.4? >> >> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim <kabhwan.opensou...@gmail.com> >> wrote: >> >>> Which Spark version do you use? There's a known issue on Kafka producer >>> pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check >>> whether your case is bound to the known issue or not. >>> >>> https://issues.apache.org/jira/browse/SPARK-21869 >>> >>> >>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes <mailinglist...@gmail.com> >>> wrote: >>> >>>> I know this is related to Kafka but it happens during the Spark >>>> Structured Streaming job that's why I am asking on this mailing list. >>>> >>>> How would you debug this or get around this in Spark Structured >>>> Streaming? Any tips would be appreciated. Thanks. >>>> >>>> >>>> java.lang.IllegalStateException: Cannot perform operation after >>>> producer has been closed at >>>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853) >>>> at >>>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862) >>>> at >>>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) >>>> at >>>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92) >>>> at >>>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95) >>>> >>>