I see no issue from running this code in local dev. (changed the scope of
Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In
3.0.0-preview update mode was restricted (as the error message says) and it
was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
.m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> And also include some test data as well. I quickly looked through the code
> and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <gschiavonsp...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and
>> regarding the repo, I believe just commit it to your personal repo and that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com>
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>> any specific Github repository I need to commit code into - OR - just in
>>> our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Thanks you, as we've asked could you please create a jira and commit
>>>> the code into github?
>>>> It would speed things up a lot.
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com>
>>>> wrote:
>>>>
>>>>> Here's a very simple reproducer app. I've attached 3 files:
>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>>>> email as well:
>>>>>
>>>>> package com.myorg
>>>>>
>>>>> import org.apache.hadoop.conf.Configuration
>>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>>> import org.apache.hadoop.security.UserGroupInformation
>>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>>>
>>>>> import scala.util.{Failure, Success, Try}
>>>>>
>>>>> object Spark3Test {
>>>>>
>>>>>   val isLocal = false
>>>>>
>>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>>
>>>>>   val START_DATE_INDEX = 21
>>>>>   val END_DATE_INDEX = 40
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>>>>> isLocal)
>>>>>     spark.sparkContext.setLogLevel("WARN")
>>>>>
>>>>>     readKafkaStream(spark)
>>>>>       .groupByKey(row => {
>>>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>>       })
>>>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>       .filter(row => !row.inProgress)
>>>>>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>>>       .writeStream
>>>>>       .format("kafka")
>>>>>       .option(
>>>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>>         "10.29.42.141:9092"
>>>>> //        "localhost:9092"
>>>>>       )
>>>>>       .option("topic", "spark3test")
>>>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>>       .outputMode("update")
>>>>>       .start()
>>>>>     manageStreamingQueries(spark)
>>>>>   }
>>>>>
>>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>>>
>>>>>     val stream = sparkSession.readStream
>>>>>       .format("kafka")
>>>>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>>       .option("subscribe", "inputTopic")
>>>>>       .option("startingOffsets", "latest")
>>>>>       .option("failOnDataLoss", "false")
>>>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>>>       .load()
>>>>>       .selectExpr("CAST(value AS STRING)")
>>>>>       .as[String](Encoders.STRING)
>>>>>     stream
>>>>>   }
>>>>>
>>>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>>>>> GroupState[MyState]): MyState = {
>>>>>     if (!oldState.exists) {
>>>>>       println(key)
>>>>>       val state = MyState(key)
>>>>>       oldState.update(state)
>>>>>       oldState.setTimeoutDuration("1 minutes")
>>>>>       oldState.get
>>>>>     } else {
>>>>>       if (oldState.hasTimedOut) {
>>>>>         oldState.get.inProgress = false
>>>>>         val state = oldState.get
>>>>>         println("State timed out for key: " + state.dateTime)
>>>>>         oldState.remove()
>>>>>         state
>>>>>       } else {
>>>>>         val state = oldState.get
>>>>>         state.count = state.count + 1
>>>>>         oldState.update(state)
>>>>>         oldState.setTimeoutDuration("1 minutes")
>>>>>         oldState.get
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>>>>> SparkSession = {
>>>>>     UserGroupInformation.setLoginUser(
>>>>>       UserGroupInformation.createRemoteUser("hduser")
>>>>>     )
>>>>>
>>>>>     val builder = SparkSession
>>>>>       .builder()
>>>>>       .appName(applicationName)
>>>>>
>>>>>     if (isLocal) {
>>>>>       builder.config("spark.master", "local[2]")
>>>>>     }
>>>>>
>>>>>     builder.getOrCreate()
>>>>>   }
>>>>>
>>>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>>>
>>>>>     val sparkQueryListener = new QueryListener()
>>>>>     spark.streams.addListener(sparkQueryListener)
>>>>>
>>>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>>>
>>>>>     val timeoutInMilliSeconds = 60000
>>>>>     while (!spark.streams.active.isEmpty) {
>>>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match 
>>>>> {
>>>>>         case Success(result) =>
>>>>>           if (result) {
>>>>>             println("A streaming query was terminated successfully")
>>>>>             spark.streams.resetTerminated()
>>>>>           }
>>>>>         case Failure(e) =>
>>>>>           println("Query failed with message: " + e.getMessage)
>>>>>           e.printStackTrace()
>>>>>           spark.streams.resetTerminated()
>>>>>       }
>>>>>
>>>>>       if (checkMarker(shutdownMarker)) {
>>>>>         spark.streams.active.foreach(query => {
>>>>>           println(s"Stopping streaming query: ${query.id}")
>>>>>           query.stop()
>>>>>         })
>>>>>         spark.stop()
>>>>>         removeMarker(shutdownMarker)
>>>>>       }
>>>>>     }
>>>>>     assert(spark.streams.active.isEmpty)
>>>>>     spark.streams.removeListener(sparkQueryListener)
>>>>>   }
>>>>>
>>>>>   def checkMarker(markerFile: String): Boolean = {
>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>     fs.exists(new Path(markerFile))
>>>>>   }
>>>>>
>>>>>   def removeMarker(markerFile: String): Unit = {
>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>     fs.delete(new Path(markerFile), true)
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>> case class MyState(var dateTime: String, var inProgress: Boolean = true, 
>>>>> var count: Int = 1)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> package com.myorg
>>>>>
>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>>>>
>>>>> class QueryListener extends StreamingQueryListener {
>>>>>
>>>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): 
>>>>> Unit = {}
>>>>>
>>>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): 
>>>>> Unit = {
>>>>>     if (event.progress.numInputRows != 0) {
>>>>>       println(
>>>>>         s"InputRows: ${event.progress.numInputRows}"
>>>>>       )
>>>>>     }
>>>>>   }
>>>>>
>>>>>   def onQueryTerminated(event: 
>>>>> StreamingQueryListener.QueryTerminatedEvent): Unit = {
>>>>>     println(s"Query with id ${event.id} terminated")
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>>>> http://maven.apache.org/maven-v4_0_0.xsd";>
>>>>>   <modelVersion>4.0.0</modelVersion>
>>>>>   <groupId>com.myorg</groupId>
>>>>>   <artifactId>spark-3-conversion</artifactId>
>>>>>   <packaging>jar</packaging>
>>>>>   <version>1.0-SNAPSHOT</version>
>>>>>   <name>spark-3-conversion</name>
>>>>>   <url>http://maven.apache.org</url>
>>>>>
>>>>>   <properties>
>>>>>     <spark.version>3.0.0</spark.version>
>>>>>     <scala.binary.version>2.12</scala.binary.version>
>>>>>     <scala.version>2.12.10</scala.version>
>>>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>>>     <skipTests>true</skipTests>
>>>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>>>     <encoding>UTF-8</encoding>
>>>>>   </properties>
>>>>>
>>>>>
>>>>>   <dependencies>
>>>>>     <dependency>
>>>>>       <groupId>org.scala-lang</groupId>
>>>>>       <artifactId>scala-library</artifactId>
>>>>>       <version>${scala.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       
>>>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       
>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.slf4j</groupId>
>>>>>       <artifactId>slf4j-log4j12</artifactId>
>>>>>       <version>1.7.7</version>
>>>>>       <scope>runtime</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>log4j</groupId>
>>>>>       <artifactId>log4j</artifactId>
>>>>>       <version>1.2.17</version>
>>>>>       <scope>compile</scope>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.scalatest</groupId>
>>>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>>>       <version>3.0.7</version>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>junit</groupId>
>>>>>       <artifactId>junit</artifactId>
>>>>>       <version>3.8.1</version>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>   </dependencies>
>>>>>
>>>>>   <build>
>>>>>     <plugins>
>>>>>       <plugin>
>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>         <artifactId>maven-shade-plugin</artifactId>
>>>>>         <version>3.0.0</version>
>>>>>         <executions>
>>>>>           <!-- Run shade goal on package phase -->
>>>>>           <execution>
>>>>>             <phase>install</phase>
>>>>>             <goals>
>>>>>               <goal>shade</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <!-- Scala Compiler -->
>>>>>       <plugin>
>>>>>         <groupId>net.alchim31.maven</groupId>
>>>>>         <artifactId>scala-maven-plugin</artifactId>
>>>>>         <version>3.2.2</version>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <goals>
>>>>>               <goal>compile</goal>
>>>>>               <goal>testCompile</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <plugin>
>>>>>         <groupId>org.codehaus.mojo</groupId>
>>>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>>>         <version>1.7</version>
>>>>>         <executions>
>>>>>           <!-- Add src/main/scala to eclipse build path -->
>>>>>           <execution>
>>>>>             <id>add-source</id>
>>>>>             <phase>generate-sources</phase>
>>>>>             <goals>
>>>>>               <goal>add-source</goal>
>>>>>             </goals>
>>>>>             <configuration>
>>>>>               <sources>
>>>>>                 <source>src/main/scala</source>
>>>>>               </sources>
>>>>>             </configuration>
>>>>>           </execution>
>>>>>           <!-- Add src/test/scala to eclipse build path -->
>>>>>           <execution>
>>>>>             <id>add-test-source</id>
>>>>>             <phase>generate-test-sources</phase>
>>>>>             <goals>
>>>>>               <goal>add-test-source</goal>
>>>>>             </goals>
>>>>>             <configuration>
>>>>>               <sources>
>>>>>                 <source>src/test/scala</source>
>>>>>               </sources>
>>>>>             </configuration>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <!-- we disable surefile and enable scalatest so that maven can run 
>>>>> our tests -->
>>>>>       <plugin>
>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>>>         <version>2.7</version>
>>>>>         <configuration>
>>>>>           <skipTests>true</skipTests>
>>>>>         </configuration>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.scalatest</groupId>
>>>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <configuration>
>>>>>           
>>>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>>>           <junitxml>.</junitxml>
>>>>>           <filereports>WDF TestSuite.txt</filereports>
>>>>>         </configuration>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <id>test</id>
>>>>>             <goals>
>>>>>               <goal>test</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.scalastyle</groupId>
>>>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>>>         <version>1.0.0</version>
>>>>>         <configuration>
>>>>>           <verbose>false</verbose>
>>>>>           <failOnViolation>true</failOnViolation>
>>>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>>>           <failOnWarning>false</failOnWarning>
>>>>>           
>>>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>>>>           
>>>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>>>>           
>>>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>>>           <outputEncoding>UTF-8</outputEncoding>
>>>>>         </configuration>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <goals>
>>>>>               <goal>check</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>>>         <version>3.6.0.1398</version>
>>>>>       </plugin>
>>>>>
>>>>>     </plugins>
>>>>>   </build>
>>>>> </project>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok. I will work on creating a reproducible app. Thanks.
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Just reached this thread. +1 on to create a simple reproducer app
>>>>>>> and I suggest to create a jira attaching the full driver and executor 
>>>>>>> logs.
>>>>>>> Ping me on the jira and I'll pick this up right away...
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Would you mind if I ask for a simple reproducer? Would be nice if
>>>>>>>> you could create a repository in Github and push the code including the
>>>>>>>> build script.
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <
>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>>>>>>>
>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>>>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Which exact Spark version did you use? Did you make sure the
>>>>>>>>>> version for Spark and the version for spark-sql-kafka artifact are 
>>>>>>>>>> the
>>>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but
>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.)
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <
>>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data
>>>>>>>>>>> source v2 streaming sinks does not support Update mode. === 
>>>>>>>>>>> Streaming Query
>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} 
>>>>>>>>>>> Current
>>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: 
>>>>>>>>>>> RUNNABLE at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 
>>>>>>>>>>> streaming
>>>>>>>>>>> sinks does not support Update mode. at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>>>> at 
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>>>> ... 1 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Please see the attached image for more information.*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Can you post the whole message? I'm trying to find what might
>>>>>>>>>>>> be causing it. A small reproducible example would be of help too. 
>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>
>>>>>>>>>>>> Pozdrawiam,
>>>>>>>>>>>> Jacek Laskowski
>>>>>>>>>>>> ----
>>>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>>>>
>>>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <
>>>>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming
>>>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency 
>>>>>>>>>>>>> given below:
>>>>>>>>>>>>>
>>>>>>>>>>>>> <dependency>
>>>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>>>>     
>>>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>>>> </dependency>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data
>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode*
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link (
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>>>>>
>>>>>>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> .writeStream
>>>>>>>>>>>>> .format("kafka")
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> What am I missing?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>

Reply via email to