Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:
package com.myorg import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.producer.ProducerConfig import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import scala.util.{Failure, Success, Try} object Spark3Test { val isLocal = false implicit val stringEncoder: Encoder[String] = Encoders.STRING implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] val START_DATE_INDEX = 21 val END_DATE_INDEX = 40 def main(args: Array[String]) { val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal) spark.sparkContext.setLogLevel("WARN") readKafkaStream(spark) .groupByKey(row => { row.substring(START_DATE_INDEX, END_DATE_INDEX) }) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) .filter(row => !row.inProgress) .map(row => "key: " + row.dateTime + " " + "count: " + row.count) .writeStream .format("kafka") .option( s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", "10.29.42.141:9092" // "localhost:9092" ) .option("topic", "spark3test") .option("checkpointLocation", "/tmp/checkpoint_5") .outputMode("update") .start() manageStreamingQueries(spark) } def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { val stream = sparkSession.readStream .format("kafka") .option("kafka.bootstrap.servers", "10.29.42.141:9092") .option("subscribe", "inputTopic") .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafkaConsumer.pollTimeoutMs", "120000") .load() .selectExpr("CAST(value AS STRING)") .as[String](Encoders.STRING) stream } def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = { if (!oldState.exists) { println(key) val state = MyState(key) oldState.update(state) oldState.setTimeoutDuration("1 minutes") oldState.get } else { if (oldState.hasTimedOut) { oldState.get.inProgress = false val state = oldState.get println("State timed out for key: " + state.dateTime) oldState.remove() state } else { val state = oldState.get state.count = state.count + 1 oldState.update(state) oldState.setTimeoutDuration("1 minutes") oldState.get } } } def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = { UserGroupInformation.setLoginUser( UserGroupInformation.createRemoteUser("hduser") ) val builder = SparkSession .builder() .appName(applicationName) if (isLocal) { builder.config("spark.master", "local[2]") } builder.getOrCreate() } def manageStreamingQueries(spark: SparkSession): Unit = { val sparkQueryListener = new QueryListener() spark.streams.addListener(sparkQueryListener) val shutdownMarker: String = "/tmp/stop_job" val timeoutInMilliSeconds = 60000 while (!spark.streams.active.isEmpty) { Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match { case Success(result) => if (result) { println("A streaming query was terminated successfully") spark.streams.resetTerminated() } case Failure(e) => println("Query failed with message: " + e.getMessage) e.printStackTrace() spark.streams.resetTerminated() } if (checkMarker(shutdownMarker)) { spark.streams.active.foreach(query => { println(s"Stopping streaming query: ${query.id}") query.stop() }) spark.stop() removeMarker(shutdownMarker) } } assert(spark.streams.active.isEmpty) spark.streams.removeListener(sparkQueryListener) } def checkMarker(markerFile: String): Boolean = { val fs = FileSystem.get(new Configuration()) fs.exists(new Path(markerFile)) } def removeMarker(markerFile: String): Unit = { val fs = FileSystem.get(new Configuration()) fs.delete(new Path(markerFile), true) } } case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1) package com.myorg import org.apache.spark.sql.streaming.StreamingQueryListener class QueryListener extends StreamingQueryListener { def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { if (event.progress.numInputRows != 0) { println( s"InputRows: ${event.progress.numInputRows}" ) } } def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { println(s"Query with id ${event.id} terminated") } } <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.myorg</groupId> <artifactId>spark-3-conversion</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>spark-3-conversion</name> <url>http://maven.apache.org</url> <properties> <spark.version>3.0.0</spark.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.10</scala.version> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> <skipTests>true</skipTests> <maven.compiler.source>1.5</maven.compiler.source> <maven.compiler.target>1.5</maven.compiler.target> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <version>3.0.7</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>install</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> <!-- Scala Compiler --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>1.7</version> <executions> <!-- Add src/main/scala to eclipse build path --> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>src/main/scala</source> </sources> </configuration> </execution> <!-- Add src/test/scala to eclipse build path --> <execution> <id>add-test-source</id> <phase>generate-test-sources</phase> <goals> <goal>add-test-source</goal> </goals> <configuration> <sources> <source>src/test/scala</source> </sources> </configuration> </execution> </executions> </plugin> <!-- we disable surefile and enable scalatest so that maven can run our tests --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.7</version> <configuration> <skipTests>true</skipTests> </configuration> </plugin> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>1.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>WDF TestSuite.txt</filereports> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scalastyle</groupId> <artifactId>scalastyle-maven-plugin</artifactId> <version>1.0.0</version> <configuration> <verbose>false</verbose> <failOnViolation>true</failOnViolation> <includeTestSourceDirectory>true</includeTestSourceDirectory> <failOnWarning>false</failOnWarning> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> <configLocation>lib/scalastyle_config.xml</configLocation> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> <outputEncoding>UTF-8</outputEncoding> </configuration> <executions> <execution> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.sonarsource.scanner.maven</groupId> <artifactId>sonar-maven-plugin</artifactId> <version>3.6.0.1398</version> </plugin> </plugins> </build> </project> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com> wrote: > Ok. I will work on creating a reproducible app. Thanks. > > On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Just reached this thread. +1 on to create a simple reproducer app and I >> suggest to create a jira attaching the full driver and executor logs. >> Ping me on the jira and I'll pick this up right away... >> >> Thanks! >> >> G >> >> >> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> Would you mind if I ask for a simple reproducer? Would be nice if you >>> could create a repository in Github and push the code including the build >>> script. >>> >>> Thanks in advance! >>> >>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglist...@gmail.com> >>> wrote: >>> >>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>>> >>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>> kabhwan.opensou...@gmail.com> wrote: >>>> >>>>> Which exact Spark version did you use? Did you make sure the version >>>>> for Spark and the version for spark-sql-kafka artifact are the same? (I >>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka >>>>> dependency pointed to 3.1.0.) >>>>> >>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <mailinglist...@gmail.com> >>>>> wrote: >>>>> >>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source >>>>>> v2 streaming sinks does not support Update mode. === Streaming Query === >>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = >>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} >>>>>> Current >>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming >>>>>> sinks does not support Update mode. at >>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>> at >>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>> ... 1 more >>>>>> >>>>>> >>>>>> *Please see the attached image for more information.* >>>>>> >>>>>> >>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Can you post the whole message? I'm trying to find what might be >>>>>>> causing it. A small reproducible example would be of help too. Thank >>>>>>> you. >>>>>>> >>>>>>> Pozdrawiam, >>>>>>> Jacek Laskowski >>>>>>> ---- >>>>>>> https://about.me/JacekLaskowski >>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>> >>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>> application to Spark 3.0. I compiled it using the dependency given >>>>>>>> below: >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> >>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>> <version>3.1.0</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> >>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>> >>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>> the only supported Output mode is "*Update*". >>>>>>>> >>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>> >>>>>>>> .writeStream >>>>>>>> .format("kafka") >>>>>>>> >>>>>>>> >>>>>>>> What am I missing? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>>>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.myorg</groupId> <artifactId>spark-3-conversion</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>spark-3-conversion</name> <url>http://maven.apache.org</url> <properties> <spark.version>3.0.0</spark.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.10</scala.version> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> <skipTests>true</skipTests> <maven.compiler.source>1.5</maven.compiler.source> <maven.compiler.target>1.5</maven.compiler.target> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <version>3.0.7</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>install</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> <!-- Scala Compiler --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>1.7</version> <executions> <!-- Add src/main/scala to eclipse build path --> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>src/main/scala</source> </sources> </configuration> </execution> <!-- Add src/test/scala to eclipse build path --> <execution> <id>add-test-source</id> <phase>generate-test-sources</phase> <goals> <goal>add-test-source</goal> </goals> <configuration> <sources> <source>src/test/scala</source> </sources> </configuration> </execution> </executions> </plugin> <!-- we disable surefile and enable scalatest so that maven can run our tests --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.7</version> <configuration> <skipTests>true</skipTests> </configuration> </plugin> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>1.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>WDF TestSuite.txt</filereports> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scalastyle</groupId> <artifactId>scalastyle-maven-plugin</artifactId> <version>1.0.0</version> <configuration> <verbose>false</verbose> <failOnViolation>true</failOnViolation> <includeTestSourceDirectory>true</includeTestSourceDirectory> <failOnWarning>false</failOnWarning> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> <configLocation>lib/scalastyle_config.xml</configLocation> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> <outputEncoding>UTF-8</outputEncoding> </configuration> <executions> <execution> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.sonarsource.scanner.maven</groupId> <artifactId>sonar-maven-plugin</artifactId> <version>3.6.0.1398</version> </plugin> </plugins> </build> </project>
QueryListener.scala
Description: Binary data
Spark3Test.scala
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org