Thanks for double checking the version. Please report back with 3.1 version whether it works or not.
G On Tue, 19 Jan 2021, 07:41 Eric Beabes, <mailinglist...@gmail.com> wrote: > Confirmed. The cluster Admin said his team installed the latest version > from Cloudera which comes with Spark 3.0.0-preview2. They are going to try > to upgrade it with the Community edition Spark 3.1.0. > > Thanks Jungtaek for the tip. Greatly appreciate it. > > On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> >> "Could you please make sure you're not using "3.0.0-preview". >> >> This could be the reason. I will check with our Hadoop cluster >> administrator. It's quite possible that they installed the "Preview" mode. >> Yes, the code works in the Local dev environment. >> >> >> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> I see no issue from running this code in local dev. (changed the scope >>> of Spark artifacts to "compile" of course) >>> >>> Could you please make sure you're not using "3.0.0-preview"? In >>> 3.0.0-preview update mode was restricted (as the error message says) and it >>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your >>> .m2 cache may work. >>> >>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> And also include some test data as well. I quickly looked through the >>>> code and the code may require a specific format of the record. >>>> >>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon < >>>> gschiavonsp...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> This is the jira >>>>> <https://issues.apache.org/jira/projects/SPARK/summary> and regarding >>>>> the repo, I believe just commit it to your personal repo and that should >>>>> be >>>>> it. >>>>> >>>>> Regards >>>>> >>>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sorry. Can you please tell me where to create the JIRA? Also is there >>>>>> any specific Github repository I need to commit code into - OR - just in >>>>>> our own? Please let me know. Thanks. >>>>>> >>>>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Thanks you, as we've asked could you please create a jira and commit >>>>>>> the code into github? >>>>>>> It would speed things up a lot. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> Here's a very simple reproducer app. I've attached 3 files: >>>>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the >>>>>>>> email as well: >>>>>>>> >>>>>>>> package com.myorg >>>>>>>> >>>>>>>> import org.apache.hadoop.conf.Configuration >>>>>>>> import org.apache.hadoop.fs.{FileSystem, Path} >>>>>>>> import org.apache.hadoop.security.UserGroupInformation >>>>>>>> import org.apache.kafka.clients.producer.ProducerConfig >>>>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >>>>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >>>>>>>> >>>>>>>> import scala.util.{Failure, Success, Try} >>>>>>>> >>>>>>>> object Spark3Test { >>>>>>>> >>>>>>>> val isLocal = false >>>>>>>> >>>>>>>> implicit val stringEncoder: Encoder[String] = Encoders.STRING >>>>>>>> implicit val myStateEncoder: Encoder[MyState] = >>>>>>>> Encoders.kryo[MyState] >>>>>>>> >>>>>>>> val START_DATE_INDEX = 21 >>>>>>>> val END_DATE_INDEX = 40 >>>>>>>> >>>>>>>> def main(args: Array[String]) { >>>>>>>> >>>>>>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 >>>>>>>> Upgrade", isLocal) >>>>>>>> spark.sparkContext.setLogLevel("WARN") >>>>>>>> >>>>>>>> readKafkaStream(spark) >>>>>>>> .groupByKey(row => { >>>>>>>> row.substring(START_DATE_INDEX, END_DATE_INDEX) >>>>>>>> }) >>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>> updateAcrossEvents >>>>>>>> ) >>>>>>>> .filter(row => !row.inProgress) >>>>>>>> .map(row => "key: " + row.dateTime + " " + "count: " + row.count) >>>>>>>> .writeStream >>>>>>>> .format("kafka") >>>>>>>> .option( >>>>>>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >>>>>>>> "10.29.42.141:9092" >>>>>>>> // "localhost:9092" >>>>>>>> ) >>>>>>>> .option("topic", "spark3test") >>>>>>>> .option("checkpointLocation", "/tmp/checkpoint_5") >>>>>>>> .outputMode("update") >>>>>>>> .start() >>>>>>>> manageStreamingQueries(spark) >>>>>>>> } >>>>>>>> >>>>>>>> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >>>>>>>> >>>>>>>> val stream = sparkSession.readStream >>>>>>>> .format("kafka") >>>>>>>> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >>>>>>>> .option("subscribe", "inputTopic") >>>>>>>> .option("startingOffsets", "latest") >>>>>>>> .option("failOnDataLoss", "false") >>>>>>>> .option("kafkaConsumer.pollTimeoutMs", "120000") >>>>>>>> .load() >>>>>>>> .selectExpr("CAST(value AS STRING)") >>>>>>>> .as[String](Encoders.STRING) >>>>>>>> stream >>>>>>>> } >>>>>>>> >>>>>>>> def updateAcrossEvents(key: String, inputs: Iterator[String], >>>>>>>> oldState: GroupState[MyState]): MyState = { >>>>>>>> if (!oldState.exists) { >>>>>>>> println(key) >>>>>>>> val state = MyState(key) >>>>>>>> oldState.update(state) >>>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>>> oldState.get >>>>>>>> } else { >>>>>>>> if (oldState.hasTimedOut) { >>>>>>>> oldState.get.inProgress = false >>>>>>>> val state = oldState.get >>>>>>>> println("State timed out for key: " + state.dateTime) >>>>>>>> oldState.remove() >>>>>>>> state >>>>>>>> } else { >>>>>>>> val state = oldState.get >>>>>>>> state.count = state.count + 1 >>>>>>>> oldState.update(state) >>>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>>> oldState.get >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> def initializeSparkSession(applicationName: String, isLocal: >>>>>>>> Boolean): SparkSession = { >>>>>>>> UserGroupInformation.setLoginUser( >>>>>>>> UserGroupInformation.createRemoteUser("hduser") >>>>>>>> ) >>>>>>>> >>>>>>>> val builder = SparkSession >>>>>>>> .builder() >>>>>>>> .appName(applicationName) >>>>>>>> >>>>>>>> if (isLocal) { >>>>>>>> builder.config("spark.master", "local[2]") >>>>>>>> } >>>>>>>> >>>>>>>> builder.getOrCreate() >>>>>>>> } >>>>>>>> >>>>>>>> def manageStreamingQueries(spark: SparkSession): Unit = { >>>>>>>> >>>>>>>> val sparkQueryListener = new QueryListener() >>>>>>>> spark.streams.addListener(sparkQueryListener) >>>>>>>> >>>>>>>> val shutdownMarker: String = "/tmp/stop_job" >>>>>>>> >>>>>>>> val timeoutInMilliSeconds = 60000 >>>>>>>> while (!spark.streams.active.isEmpty) { >>>>>>>> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) >>>>>>>> match { >>>>>>>> case Success(result) => >>>>>>>> if (result) { >>>>>>>> println("A streaming query was terminated successfully") >>>>>>>> spark.streams.resetTerminated() >>>>>>>> } >>>>>>>> case Failure(e) => >>>>>>>> println("Query failed with message: " + e.getMessage) >>>>>>>> e.printStackTrace() >>>>>>>> spark.streams.resetTerminated() >>>>>>>> } >>>>>>>> >>>>>>>> if (checkMarker(shutdownMarker)) { >>>>>>>> spark.streams.active.foreach(query => { >>>>>>>> println(s"Stopping streaming query: ${query.id}") >>>>>>>> query.stop() >>>>>>>> }) >>>>>>>> spark.stop() >>>>>>>> removeMarker(shutdownMarker) >>>>>>>> } >>>>>>>> } >>>>>>>> assert(spark.streams.active.isEmpty) >>>>>>>> spark.streams.removeListener(sparkQueryListener) >>>>>>>> } >>>>>>>> >>>>>>>> def checkMarker(markerFile: String): Boolean = { >>>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>>> fs.exists(new Path(markerFile)) >>>>>>>> } >>>>>>>> >>>>>>>> def removeMarker(markerFile: String): Unit = { >>>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>>> fs.delete(new Path(markerFile), true) >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> case class MyState(var dateTime: String, var inProgress: Boolean = >>>>>>>> true, var count: Int = 1) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> package com.myorg >>>>>>>> >>>>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener >>>>>>>> >>>>>>>> class QueryListener extends StreamingQueryListener { >>>>>>>> >>>>>>>> def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): >>>>>>>> Unit = {} >>>>>>>> >>>>>>>> def onQueryProgress(event: >>>>>>>> StreamingQueryListener.QueryProgressEvent): Unit = { >>>>>>>> if (event.progress.numInputRows != 0) { >>>>>>>> println( >>>>>>>> s"InputRows: ${event.progress.numInputRows}" >>>>>>>> ) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> def onQueryTerminated(event: >>>>>>>> StreamingQueryListener.QueryTerminatedEvent): Unit = { >>>>>>>> println(s"Query with id ${event.id} terminated") >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>>>> http://maven.apache.org/maven-v4_0_0.xsd"> >>>>>>>> <modelVersion>4.0.0</modelVersion> >>>>>>>> <groupId>com.myorg</groupId> >>>>>>>> <artifactId>spark-3-conversion</artifactId> >>>>>>>> <packaging>jar</packaging> >>>>>>>> <version>1.0-SNAPSHOT</version> >>>>>>>> <name>spark-3-conversion</name> >>>>>>>> <url>http://maven.apache.org</url> >>>>>>>> >>>>>>>> <properties> >>>>>>>> <spark.version>3.0.0</spark.version> >>>>>>>> <scala.binary.version>2.12</scala.binary.version> >>>>>>>> <scala.version>2.12.10</scala.version> >>>>>>>> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >>>>>>>> <skipTests>true</skipTests> >>>>>>>> <maven.compiler.source>1.5</maven.compiler.source> >>>>>>>> <maven.compiler.target>1.5</maven.compiler.target> >>>>>>>> <encoding>UTF-8</encoding> >>>>>>>> </properties> >>>>>>>> >>>>>>>> >>>>>>>> <dependencies> >>>>>>>> <dependency> >>>>>>>> <groupId>org.scala-lang</groupId> >>>>>>>> <artifactId>scala-library</artifactId> >>>>>>>> <version>${scala.version}</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> <artifactId>spark-core_${scala.binary.version}</artifactId> >>>>>>>> <version>${spark.version}</version> >>>>>>>> <scope>provided</scope> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >>>>>>>> <version>${spark.version}</version> >>>>>>>> <scope>provided</scope> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> <artifactId>spark-sql_${scala.binary.version}</artifactId> >>>>>>>> <version>${spark.version}</version> >>>>>>>> <scope>provided</scope> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> >>>>>>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>> <version>${spark.version}</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>> >>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>> <version>${spark.version}</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.slf4j</groupId> >>>>>>>> <artifactId>slf4j-log4j12</artifactId> >>>>>>>> <version>1.7.7</version> >>>>>>>> <scope>runtime</scope> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>log4j</groupId> >>>>>>>> <artifactId>log4j</artifactId> >>>>>>>> <version>1.2.17</version> >>>>>>>> <scope>compile</scope> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.scalatest</groupId> >>>>>>>> <artifactId>scalatest_${scala.binary.version}</artifactId> >>>>>>>> <version>3.0.7</version> >>>>>>>> <scope>test</scope> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>junit</groupId> >>>>>>>> <artifactId>junit</artifactId> >>>>>>>> <version>3.8.1</version> >>>>>>>> <scope>test</scope> >>>>>>>> </dependency> >>>>>>>> </dependencies> >>>>>>>> >>>>>>>> <build> >>>>>>>> <plugins> >>>>>>>> <plugin> >>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>> <artifactId>maven-shade-plugin</artifactId> >>>>>>>> <version>3.0.0</version> >>>>>>>> <executions> >>>>>>>> <!-- Run shade goal on package phase --> >>>>>>>> <execution> >>>>>>>> <phase>install</phase> >>>>>>>> <goals> >>>>>>>> <goal>shade</goal> >>>>>>>> </goals> >>>>>>>> </execution> >>>>>>>> </executions> >>>>>>>> </plugin> >>>>>>>> >>>>>>>> <!-- Scala Compiler --> >>>>>>>> <plugin> >>>>>>>> <groupId>net.alchim31.maven</groupId> >>>>>>>> <artifactId>scala-maven-plugin</artifactId> >>>>>>>> <version>3.2.2</version> >>>>>>>> <executions> >>>>>>>> <execution> >>>>>>>> <goals> >>>>>>>> <goal>compile</goal> >>>>>>>> <goal>testCompile</goal> >>>>>>>> </goals> >>>>>>>> </execution> >>>>>>>> </executions> >>>>>>>> </plugin> >>>>>>>> >>>>>>>> <plugin> >>>>>>>> <groupId>org.codehaus.mojo</groupId> >>>>>>>> <artifactId>build-helper-maven-plugin</artifactId> >>>>>>>> <version>1.7</version> >>>>>>>> <executions> >>>>>>>> <!-- Add src/main/scala to eclipse build path --> >>>>>>>> <execution> >>>>>>>> <id>add-source</id> >>>>>>>> <phase>generate-sources</phase> >>>>>>>> <goals> >>>>>>>> <goal>add-source</goal> >>>>>>>> </goals> >>>>>>>> <configuration> >>>>>>>> <sources> >>>>>>>> <source>src/main/scala</source> >>>>>>>> </sources> >>>>>>>> </configuration> >>>>>>>> </execution> >>>>>>>> <!-- Add src/test/scala to eclipse build path --> >>>>>>>> <execution> >>>>>>>> <id>add-test-source</id> >>>>>>>> <phase>generate-test-sources</phase> >>>>>>>> <goals> >>>>>>>> <goal>add-test-source</goal> >>>>>>>> </goals> >>>>>>>> <configuration> >>>>>>>> <sources> >>>>>>>> <source>src/test/scala</source> >>>>>>>> </sources> >>>>>>>> </configuration> >>>>>>>> </execution> >>>>>>>> </executions> >>>>>>>> </plugin> >>>>>>>> >>>>>>>> <!-- we disable surefile and enable scalatest so that maven can >>>>>>>> run our tests --> >>>>>>>> <plugin> >>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>> <artifactId>maven-surefire-plugin</artifactId> >>>>>>>> <version>2.7</version> >>>>>>>> <configuration> >>>>>>>> <skipTests>true</skipTests> >>>>>>>> </configuration> >>>>>>>> </plugin> >>>>>>>> <plugin> >>>>>>>> <groupId>org.scalatest</groupId> >>>>>>>> <artifactId>scalatest-maven-plugin</artifactId> >>>>>>>> <version>1.0</version> >>>>>>>> <configuration> >>>>>>>> >>>>>>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >>>>>>>> <junitxml>.</junitxml> >>>>>>>> <filereports>WDF TestSuite.txt</filereports> >>>>>>>> </configuration> >>>>>>>> <executions> >>>>>>>> <execution> >>>>>>>> <id>test</id> >>>>>>>> <goals> >>>>>>>> <goal>test</goal> >>>>>>>> </goals> >>>>>>>> </execution> >>>>>>>> </executions> >>>>>>>> </plugin> >>>>>>>> <plugin> >>>>>>>> <groupId>org.scalastyle</groupId> >>>>>>>> <artifactId>scalastyle-maven-plugin</artifactId> >>>>>>>> <version>1.0.0</version> >>>>>>>> <configuration> >>>>>>>> <verbose>false</verbose> >>>>>>>> <failOnViolation>true</failOnViolation> >>>>>>>> <includeTestSourceDirectory>true</includeTestSourceDirectory> >>>>>>>> <failOnWarning>false</failOnWarning> >>>>>>>> >>>>>>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >>>>>>>> >>>>>>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >>>>>>>> <configLocation>lib/scalastyle_config.xml</configLocation> >>>>>>>> >>>>>>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >>>>>>>> <outputEncoding>UTF-8</outputEncoding> >>>>>>>> </configuration> >>>>>>>> <executions> >>>>>>>> <execution> >>>>>>>> <goals> >>>>>>>> <goal>check</goal> >>>>>>>> </goals> >>>>>>>> </execution> >>>>>>>> </executions> >>>>>>>> </plugin> >>>>>>>> <plugin> >>>>>>>> <groupId>org.sonarsource.scanner.maven</groupId> >>>>>>>> <artifactId>sonar-maven-plugin</artifactId> >>>>>>>> <version>3.6.0.1398</version> >>>>>>>> </plugin> >>>>>>>> >>>>>>>> </plugins> >>>>>>>> </build> >>>>>>>> </project> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Ok. I will work on creating a reproducible app. Thanks. >>>>>>>>> >>>>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Just reached this thread. +1 on to create a simple reproducer app >>>>>>>>>> and I suggest to create a jira attaching the full driver and >>>>>>>>>> executor logs. >>>>>>>>>> Ping me on the jira and I'll pick this up right away... >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Would you mind if I ask for a simple reproducer? Would be nice >>>>>>>>>>> if you could create a repository in Github and push the code >>>>>>>>>>> including the >>>>>>>>>>> build script. >>>>>>>>>>> >>>>>>>>>>> Thanks in advance! >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes < >>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I tried both. First tried 3.0.0. That didn't work so I >>>>>>>>>>>> tried 3.1.0. >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Which exact Spark version did you use? Did you make sure the >>>>>>>>>>>>> version for Spark and the version for spark-sql-kafka artifact >>>>>>>>>>>>> are the >>>>>>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but >>>>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.) >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data >>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode. === >>>>>>>>>>>>>> Streaming Query >>>>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, >>>>>>>>>>>>>> runId = >>>>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: >>>>>>>>>>>>>> {} Current >>>>>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>>>>>>>> RUNNABLE at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 >>>>>>>>>>>>>> streaming >>>>>>>>>>>>>> sinks does not support Update mode. at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> *Please see the attached image for more information.* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski < >>>>>>>>>>>>>> ja...@japila.pl> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Can you post the whole message? I'm trying to find what >>>>>>>>>>>>>>> might be causing it. A small reproducible example would be of >>>>>>>>>>>>>>> help too. >>>>>>>>>>>>>>> Thank you. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Pozdrawiam, >>>>>>>>>>>>>>> Jacek Laskowski >>>>>>>>>>>>>>> ---- >>>>>>>>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency >>>>>>>>>>>>>>>> given below: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> <dependency> >>>>>>>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>>>>>>>> <version>3.1.0</version> >>>>>>>>>>>>>>>> </dependency> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .writeStream >>>>>>>>>>>>>>>> .format("kafka") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What am I missing? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>