Confirmed. The cluster Admin said his team installed the latest version from Cloudera which comes with Spark 3.0.0-preview2. They are going to try to upgrade it with the Community edition Spark 3.1.0.
Thanks Jungtaek for the tip. Greatly appreciate it. On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglist...@gmail.com> wrote: > >> "Could you please make sure you're not using "3.0.0-preview". > > This could be the reason. I will check with our Hadoop cluster > administrator. It's quite possible that they installed the "Preview" mode. > Yes, the code works in the Local dev environment. > > > On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> > wrote: > >> I see no issue from running this code in local dev. (changed the scope of >> Spark artifacts to "compile" of course) >> >> Could you please make sure you're not using "3.0.0-preview"? In >> 3.0.0-preview update mode was restricted (as the error message says) and it >> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your >> .m2 cache may work. >> >> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> And also include some test data as well. I quickly looked through the >>> code and the code may require a specific format of the record. >>> >>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon < >>> gschiavonsp...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> This is the jira >>>> <https://issues.apache.org/jira/projects/SPARK/summary> and regarding >>>> the repo, I believe just commit it to your personal repo and that should be >>>> it. >>>> >>>> Regards >>>> >>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com> >>>> wrote: >>>> >>>>> Sorry. Can you please tell me where to create the JIRA? Also is there >>>>> any specific Github repository I need to commit code into - OR - just in >>>>> our own? Please let me know. Thanks. >>>>> >>>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Thanks you, as we've asked could you please create a jira and commit >>>>>> the code into github? >>>>>> It would speed things up a lot. >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Here's a very simple reproducer app. I've attached 3 files: >>>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the >>>>>>> email as well: >>>>>>> >>>>>>> package com.myorg >>>>>>> >>>>>>> import org.apache.hadoop.conf.Configuration >>>>>>> import org.apache.hadoop.fs.{FileSystem, Path} >>>>>>> import org.apache.hadoop.security.UserGroupInformation >>>>>>> import org.apache.kafka.clients.producer.ProducerConfig >>>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >>>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >>>>>>> >>>>>>> import scala.util.{Failure, Success, Try} >>>>>>> >>>>>>> object Spark3Test { >>>>>>> >>>>>>> val isLocal = false >>>>>>> >>>>>>> implicit val stringEncoder: Encoder[String] = Encoders.STRING >>>>>>> implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] >>>>>>> >>>>>>> val START_DATE_INDEX = 21 >>>>>>> val END_DATE_INDEX = 40 >>>>>>> >>>>>>> def main(args: Array[String]) { >>>>>>> >>>>>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 >>>>>>> Upgrade", isLocal) >>>>>>> spark.sparkContext.setLogLevel("WARN") >>>>>>> >>>>>>> readKafkaStream(spark) >>>>>>> .groupByKey(row => { >>>>>>> row.substring(START_DATE_INDEX, END_DATE_INDEX) >>>>>>> }) >>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>> updateAcrossEvents >>>>>>> ) >>>>>>> .filter(row => !row.inProgress) >>>>>>> .map(row => "key: " + row.dateTime + " " + "count: " + row.count) >>>>>>> .writeStream >>>>>>> .format("kafka") >>>>>>> .option( >>>>>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >>>>>>> "10.29.42.141:9092" >>>>>>> // "localhost:9092" >>>>>>> ) >>>>>>> .option("topic", "spark3test") >>>>>>> .option("checkpointLocation", "/tmp/checkpoint_5") >>>>>>> .outputMode("update") >>>>>>> .start() >>>>>>> manageStreamingQueries(spark) >>>>>>> } >>>>>>> >>>>>>> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >>>>>>> >>>>>>> val stream = sparkSession.readStream >>>>>>> .format("kafka") >>>>>>> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >>>>>>> .option("subscribe", "inputTopic") >>>>>>> .option("startingOffsets", "latest") >>>>>>> .option("failOnDataLoss", "false") >>>>>>> .option("kafkaConsumer.pollTimeoutMs", "120000") >>>>>>> .load() >>>>>>> .selectExpr("CAST(value AS STRING)") >>>>>>> .as[String](Encoders.STRING) >>>>>>> stream >>>>>>> } >>>>>>> >>>>>>> def updateAcrossEvents(key: String, inputs: Iterator[String], >>>>>>> oldState: GroupState[MyState]): MyState = { >>>>>>> if (!oldState.exists) { >>>>>>> println(key) >>>>>>> val state = MyState(key) >>>>>>> oldState.update(state) >>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>> oldState.get >>>>>>> } else { >>>>>>> if (oldState.hasTimedOut) { >>>>>>> oldState.get.inProgress = false >>>>>>> val state = oldState.get >>>>>>> println("State timed out for key: " + state.dateTime) >>>>>>> oldState.remove() >>>>>>> state >>>>>>> } else { >>>>>>> val state = oldState.get >>>>>>> state.count = state.count + 1 >>>>>>> oldState.update(state) >>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>> oldState.get >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> def initializeSparkSession(applicationName: String, isLocal: >>>>>>> Boolean): SparkSession = { >>>>>>> UserGroupInformation.setLoginUser( >>>>>>> UserGroupInformation.createRemoteUser("hduser") >>>>>>> ) >>>>>>> >>>>>>> val builder = SparkSession >>>>>>> .builder() >>>>>>> .appName(applicationName) >>>>>>> >>>>>>> if (isLocal) { >>>>>>> builder.config("spark.master", "local[2]") >>>>>>> } >>>>>>> >>>>>>> builder.getOrCreate() >>>>>>> } >>>>>>> >>>>>>> def manageStreamingQueries(spark: SparkSession): Unit = { >>>>>>> >>>>>>> val sparkQueryListener = new QueryListener() >>>>>>> spark.streams.addListener(sparkQueryListener) >>>>>>> >>>>>>> val shutdownMarker: String = "/tmp/stop_job" >>>>>>> >>>>>>> val timeoutInMilliSeconds = 60000 >>>>>>> while (!spark.streams.active.isEmpty) { >>>>>>> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) >>>>>>> match { >>>>>>> case Success(result) => >>>>>>> if (result) { >>>>>>> println("A streaming query was terminated successfully") >>>>>>> spark.streams.resetTerminated() >>>>>>> } >>>>>>> case Failure(e) => >>>>>>> println("Query failed with message: " + e.getMessage) >>>>>>> e.printStackTrace() >>>>>>> spark.streams.resetTerminated() >>>>>>> } >>>>>>> >>>>>>> if (checkMarker(shutdownMarker)) { >>>>>>> spark.streams.active.foreach(query => { >>>>>>> println(s"Stopping streaming query: ${query.id}") >>>>>>> query.stop() >>>>>>> }) >>>>>>> spark.stop() >>>>>>> removeMarker(shutdownMarker) >>>>>>> } >>>>>>> } >>>>>>> assert(spark.streams.active.isEmpty) >>>>>>> spark.streams.removeListener(sparkQueryListener) >>>>>>> } >>>>>>> >>>>>>> def checkMarker(markerFile: String): Boolean = { >>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>> fs.exists(new Path(markerFile)) >>>>>>> } >>>>>>> >>>>>>> def removeMarker(markerFile: String): Unit = { >>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>> fs.delete(new Path(markerFile), true) >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> case class MyState(var dateTime: String, var inProgress: Boolean = >>>>>>> true, var count: Int = 1) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> package com.myorg >>>>>>> >>>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener >>>>>>> >>>>>>> class QueryListener extends StreamingQueryListener { >>>>>>> >>>>>>> def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): >>>>>>> Unit = {} >>>>>>> >>>>>>> def onQueryProgress(event: >>>>>>> StreamingQueryListener.QueryProgressEvent): Unit = { >>>>>>> if (event.progress.numInputRows != 0) { >>>>>>> println( >>>>>>> s"InputRows: ${event.progress.numInputRows}" >>>>>>> ) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> def onQueryTerminated(event: >>>>>>> StreamingQueryListener.QueryTerminatedEvent): Unit = { >>>>>>> println(s"Query with id ${event.id} terminated") >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>>> http://maven.apache.org/maven-v4_0_0.xsd"> >>>>>>> <modelVersion>4.0.0</modelVersion> >>>>>>> <groupId>com.myorg</groupId> >>>>>>> <artifactId>spark-3-conversion</artifactId> >>>>>>> <packaging>jar</packaging> >>>>>>> <version>1.0-SNAPSHOT</version> >>>>>>> <name>spark-3-conversion</name> >>>>>>> <url>http://maven.apache.org</url> >>>>>>> >>>>>>> <properties> >>>>>>> <spark.version>3.0.0</spark.version> >>>>>>> <scala.binary.version>2.12</scala.binary.version> >>>>>>> <scala.version>2.12.10</scala.version> >>>>>>> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >>>>>>> <skipTests>true</skipTests> >>>>>>> <maven.compiler.source>1.5</maven.compiler.source> >>>>>>> <maven.compiler.target>1.5</maven.compiler.target> >>>>>>> <encoding>UTF-8</encoding> >>>>>>> </properties> >>>>>>> >>>>>>> >>>>>>> <dependencies> >>>>>>> <dependency> >>>>>>> <groupId>org.scala-lang</groupId> >>>>>>> <artifactId>scala-library</artifactId> >>>>>>> <version>${scala.version}</version> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.spark</groupId> >>>>>>> <artifactId>spark-core_${scala.binary.version}</artifactId> >>>>>>> <version>${spark.version}</version> >>>>>>> <scope>provided</scope> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.spark</groupId> >>>>>>> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >>>>>>> <version>${spark.version}</version> >>>>>>> <scope>provided</scope> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.spark</groupId> >>>>>>> <artifactId>spark-sql_${scala.binary.version}</artifactId> >>>>>>> <version>${spark.version}</version> >>>>>>> <scope>provided</scope> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.spark</groupId> >>>>>>> >>>>>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>> <version>${spark.version}</version> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.spark</groupId> >>>>>>> >>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>> <version>${spark.version}</version> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.slf4j</groupId> >>>>>>> <artifactId>slf4j-log4j12</artifactId> >>>>>>> <version>1.7.7</version> >>>>>>> <scope>runtime</scope> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>log4j</groupId> >>>>>>> <artifactId>log4j</artifactId> >>>>>>> <version>1.2.17</version> >>>>>>> <scope>compile</scope> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.scalatest</groupId> >>>>>>> <artifactId>scalatest_${scala.binary.version}</artifactId> >>>>>>> <version>3.0.7</version> >>>>>>> <scope>test</scope> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>junit</groupId> >>>>>>> <artifactId>junit</artifactId> >>>>>>> <version>3.8.1</version> >>>>>>> <scope>test</scope> >>>>>>> </dependency> >>>>>>> </dependencies> >>>>>>> >>>>>>> <build> >>>>>>> <plugins> >>>>>>> <plugin> >>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>> <artifactId>maven-shade-plugin</artifactId> >>>>>>> <version>3.0.0</version> >>>>>>> <executions> >>>>>>> <!-- Run shade goal on package phase --> >>>>>>> <execution> >>>>>>> <phase>install</phase> >>>>>>> <goals> >>>>>>> <goal>shade</goal> >>>>>>> </goals> >>>>>>> </execution> >>>>>>> </executions> >>>>>>> </plugin> >>>>>>> >>>>>>> <!-- Scala Compiler --> >>>>>>> <plugin> >>>>>>> <groupId>net.alchim31.maven</groupId> >>>>>>> <artifactId>scala-maven-plugin</artifactId> >>>>>>> <version>3.2.2</version> >>>>>>> <executions> >>>>>>> <execution> >>>>>>> <goals> >>>>>>> <goal>compile</goal> >>>>>>> <goal>testCompile</goal> >>>>>>> </goals> >>>>>>> </execution> >>>>>>> </executions> >>>>>>> </plugin> >>>>>>> >>>>>>> <plugin> >>>>>>> <groupId>org.codehaus.mojo</groupId> >>>>>>> <artifactId>build-helper-maven-plugin</artifactId> >>>>>>> <version>1.7</version> >>>>>>> <executions> >>>>>>> <!-- Add src/main/scala to eclipse build path --> >>>>>>> <execution> >>>>>>> <id>add-source</id> >>>>>>> <phase>generate-sources</phase> >>>>>>> <goals> >>>>>>> <goal>add-source</goal> >>>>>>> </goals> >>>>>>> <configuration> >>>>>>> <sources> >>>>>>> <source>src/main/scala</source> >>>>>>> </sources> >>>>>>> </configuration> >>>>>>> </execution> >>>>>>> <!-- Add src/test/scala to eclipse build path --> >>>>>>> <execution> >>>>>>> <id>add-test-source</id> >>>>>>> <phase>generate-test-sources</phase> >>>>>>> <goals> >>>>>>> <goal>add-test-source</goal> >>>>>>> </goals> >>>>>>> <configuration> >>>>>>> <sources> >>>>>>> <source>src/test/scala</source> >>>>>>> </sources> >>>>>>> </configuration> >>>>>>> </execution> >>>>>>> </executions> >>>>>>> </plugin> >>>>>>> >>>>>>> <!-- we disable surefile and enable scalatest so that maven can >>>>>>> run our tests --> >>>>>>> <plugin> >>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>> <artifactId>maven-surefire-plugin</artifactId> >>>>>>> <version>2.7</version> >>>>>>> <configuration> >>>>>>> <skipTests>true</skipTests> >>>>>>> </configuration> >>>>>>> </plugin> >>>>>>> <plugin> >>>>>>> <groupId>org.scalatest</groupId> >>>>>>> <artifactId>scalatest-maven-plugin</artifactId> >>>>>>> <version>1.0</version> >>>>>>> <configuration> >>>>>>> >>>>>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >>>>>>> <junitxml>.</junitxml> >>>>>>> <filereports>WDF TestSuite.txt</filereports> >>>>>>> </configuration> >>>>>>> <executions> >>>>>>> <execution> >>>>>>> <id>test</id> >>>>>>> <goals> >>>>>>> <goal>test</goal> >>>>>>> </goals> >>>>>>> </execution> >>>>>>> </executions> >>>>>>> </plugin> >>>>>>> <plugin> >>>>>>> <groupId>org.scalastyle</groupId> >>>>>>> <artifactId>scalastyle-maven-plugin</artifactId> >>>>>>> <version>1.0.0</version> >>>>>>> <configuration> >>>>>>> <verbose>false</verbose> >>>>>>> <failOnViolation>true</failOnViolation> >>>>>>> <includeTestSourceDirectory>true</includeTestSourceDirectory> >>>>>>> <failOnWarning>false</failOnWarning> >>>>>>> >>>>>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >>>>>>> >>>>>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >>>>>>> <configLocation>lib/scalastyle_config.xml</configLocation> >>>>>>> >>>>>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >>>>>>> <outputEncoding>UTF-8</outputEncoding> >>>>>>> </configuration> >>>>>>> <executions> >>>>>>> <execution> >>>>>>> <goals> >>>>>>> <goal>check</goal> >>>>>>> </goals> >>>>>>> </execution> >>>>>>> </executions> >>>>>>> </plugin> >>>>>>> <plugin> >>>>>>> <groupId>org.sonarsource.scanner.maven</groupId> >>>>>>> <artifactId>sonar-maven-plugin</artifactId> >>>>>>> <version>3.6.0.1398</version> >>>>>>> </plugin> >>>>>>> >>>>>>> </plugins> >>>>>>> </build> >>>>>>> </project> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> Ok. I will work on creating a reproducible app. Thanks. >>>>>>>> >>>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Just reached this thread. +1 on to create a simple reproducer app >>>>>>>>> and I suggest to create a jira attaching the full driver and executor >>>>>>>>> logs. >>>>>>>>> Ping me on the jira and I'll pick this up right away... >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Would you mind if I ask for a simple reproducer? Would be nice if >>>>>>>>>> you could create a repository in Github and push the code including >>>>>>>>>> the >>>>>>>>>> build script. >>>>>>>>>> >>>>>>>>>> Thanks in advance! >>>>>>>>>> >>>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes < >>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I tried both. First tried 3.0.0. That didn't work so I >>>>>>>>>>> tried 3.1.0. >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Which exact Spark version did you use? Did you make sure the >>>>>>>>>>>> version for Spark and the version for spark-sql-kafka artifact are >>>>>>>>>>>> the >>>>>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but >>>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.) >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data >>>>>>>>>>>>> source v2 streaming sinks does not support Update mode. === >>>>>>>>>>>>> Streaming Query >>>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId >>>>>>>>>>>>> = >>>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: >>>>>>>>>>>>> {} Current >>>>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>>>>>>> RUNNABLE at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 >>>>>>>>>>>>> streaming >>>>>>>>>>>>> sinks does not support Update mode. at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *Please see the attached image for more information.* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski < >>>>>>>>>>>>> ja...@japila.pl> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Can you post the whole message? I'm trying to find what might >>>>>>>>>>>>>> be causing it. A small reproducible example would be of help >>>>>>>>>>>>>> too. Thank you. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Pozdrawiam, >>>>>>>>>>>>>> Jacek Laskowski >>>>>>>>>>>>>> ---- >>>>>>>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>>>>>>> >>>>>>>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency >>>>>>>>>>>>>>> given below: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <dependency> >>>>>>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>>>>>>> <version>3.1.0</version> >>>>>>>>>>>>>>> </dependency> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .writeStream >>>>>>>>>>>>>>> .format("kafka") >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What am I missing? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>> >>>>>>>>>>>>