Will do, thanks! On Tue, Jan 19, 2021 at 1:39 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote:
> Thanks for double checking the version. Please report back with 3.1 > version whether it works or not. > > G > > > On Tue, 19 Jan 2021, 07:41 Eric Beabes, <mailinglist...@gmail.com> wrote: > >> Confirmed. The cluster Admin said his team installed the latest version >> from Cloudera which comes with Spark 3.0.0-preview2. They are going to try >> to upgrade it with the Community edition Spark 3.1.0. >> >> Thanks Jungtaek for the tip. Greatly appreciate it. >> >> On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglist...@gmail.com> >> wrote: >> >>> >> "Could you please make sure you're not using "3.0.0-preview". >>> >>> This could be the reason. I will check with our Hadoop cluster >>> administrator. It's quite possible that they installed the "Preview" mode. >>> Yes, the code works in the Local dev environment. >>> >>> >>> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> I see no issue from running this code in local dev. (changed the scope >>>> of Spark artifacts to "compile" of course) >>>> >>>> Could you please make sure you're not using "3.0.0-preview"? In >>>> 3.0.0-preview update mode was restricted (as the error message says) and it >>>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your >>>> .m2 cache may work. >>>> >>>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim < >>>> kabhwan.opensou...@gmail.com> wrote: >>>> >>>>> And also include some test data as well. I quickly looked through the >>>>> code and the code may require a specific format of the record. >>>>> >>>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon < >>>>> gschiavonsp...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> This is the jira >>>>>> <https://issues.apache.org/jira/projects/SPARK/summary> and >>>>>> regarding the repo, I believe just commit it to your personal repo and >>>>>> that >>>>>> should be it. >>>>>> >>>>>> Regards >>>>>> >>>>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Sorry. Can you please tell me where to create the JIRA? Also is >>>>>>> there any specific Github repository I need to commit code into - OR - >>>>>>> just >>>>>>> in our own? Please let me know. Thanks. >>>>>>> >>>>>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks you, as we've asked could you please create a jira and >>>>>>>> commit the code into github? >>>>>>>> It would speed things up a lot. >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Here's a very simple reproducer app. I've attached 3 files: >>>>>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in >>>>>>>>> the >>>>>>>>> email as well: >>>>>>>>> >>>>>>>>> package com.myorg >>>>>>>>> >>>>>>>>> import org.apache.hadoop.conf.Configuration >>>>>>>>> import org.apache.hadoop.fs.{FileSystem, Path} >>>>>>>>> import org.apache.hadoop.security.UserGroupInformation >>>>>>>>> import org.apache.kafka.clients.producer.ProducerConfig >>>>>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >>>>>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >>>>>>>>> >>>>>>>>> import scala.util.{Failure, Success, Try} >>>>>>>>> >>>>>>>>> object Spark3Test { >>>>>>>>> >>>>>>>>> val isLocal = false >>>>>>>>> >>>>>>>>> implicit val stringEncoder: Encoder[String] = Encoders.STRING >>>>>>>>> implicit val myStateEncoder: Encoder[MyState] = >>>>>>>>> Encoders.kryo[MyState] >>>>>>>>> >>>>>>>>> val START_DATE_INDEX = 21 >>>>>>>>> val END_DATE_INDEX = 40 >>>>>>>>> >>>>>>>>> def main(args: Array[String]) { >>>>>>>>> >>>>>>>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 >>>>>>>>> Upgrade", isLocal) >>>>>>>>> spark.sparkContext.setLogLevel("WARN") >>>>>>>>> >>>>>>>>> readKafkaStream(spark) >>>>>>>>> .groupByKey(row => { >>>>>>>>> row.substring(START_DATE_INDEX, END_DATE_INDEX) >>>>>>>>> }) >>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>> updateAcrossEvents >>>>>>>>> ) >>>>>>>>> .filter(row => !row.inProgress) >>>>>>>>> .map(row => "key: " + row.dateTime + " " + "count: " + >>>>>>>>> row.count) >>>>>>>>> .writeStream >>>>>>>>> .format("kafka") >>>>>>>>> .option( >>>>>>>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >>>>>>>>> "10.29.42.141:9092" >>>>>>>>> // "localhost:9092" >>>>>>>>> ) >>>>>>>>> .option("topic", "spark3test") >>>>>>>>> .option("checkpointLocation", "/tmp/checkpoint_5") >>>>>>>>> .outputMode("update") >>>>>>>>> .start() >>>>>>>>> manageStreamingQueries(spark) >>>>>>>>> } >>>>>>>>> >>>>>>>>> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >>>>>>>>> >>>>>>>>> val stream = sparkSession.readStream >>>>>>>>> .format("kafka") >>>>>>>>> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >>>>>>>>> .option("subscribe", "inputTopic") >>>>>>>>> .option("startingOffsets", "latest") >>>>>>>>> .option("failOnDataLoss", "false") >>>>>>>>> .option("kafkaConsumer.pollTimeoutMs", "120000") >>>>>>>>> .load() >>>>>>>>> .selectExpr("CAST(value AS STRING)") >>>>>>>>> .as[String](Encoders.STRING) >>>>>>>>> stream >>>>>>>>> } >>>>>>>>> >>>>>>>>> def updateAcrossEvents(key: String, inputs: Iterator[String], >>>>>>>>> oldState: GroupState[MyState]): MyState = { >>>>>>>>> if (!oldState.exists) { >>>>>>>>> println(key) >>>>>>>>> val state = MyState(key) >>>>>>>>> oldState.update(state) >>>>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>>>> oldState.get >>>>>>>>> } else { >>>>>>>>> if (oldState.hasTimedOut) { >>>>>>>>> oldState.get.inProgress = false >>>>>>>>> val state = oldState.get >>>>>>>>> println("State timed out for key: " + state.dateTime) >>>>>>>>> oldState.remove() >>>>>>>>> state >>>>>>>>> } else { >>>>>>>>> val state = oldState.get >>>>>>>>> state.count = state.count + 1 >>>>>>>>> oldState.update(state) >>>>>>>>> oldState.setTimeoutDuration("1 minutes") >>>>>>>>> oldState.get >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> def initializeSparkSession(applicationName: String, isLocal: >>>>>>>>> Boolean): SparkSession = { >>>>>>>>> UserGroupInformation.setLoginUser( >>>>>>>>> UserGroupInformation.createRemoteUser("hduser") >>>>>>>>> ) >>>>>>>>> >>>>>>>>> val builder = SparkSession >>>>>>>>> .builder() >>>>>>>>> .appName(applicationName) >>>>>>>>> >>>>>>>>> if (isLocal) { >>>>>>>>> builder.config("spark.master", "local[2]") >>>>>>>>> } >>>>>>>>> >>>>>>>>> builder.getOrCreate() >>>>>>>>> } >>>>>>>>> >>>>>>>>> def manageStreamingQueries(spark: SparkSession): Unit = { >>>>>>>>> >>>>>>>>> val sparkQueryListener = new QueryListener() >>>>>>>>> spark.streams.addListener(sparkQueryListener) >>>>>>>>> >>>>>>>>> val shutdownMarker: String = "/tmp/stop_job" >>>>>>>>> >>>>>>>>> val timeoutInMilliSeconds = 60000 >>>>>>>>> while (!spark.streams.active.isEmpty) { >>>>>>>>> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) >>>>>>>>> match { >>>>>>>>> case Success(result) => >>>>>>>>> if (result) { >>>>>>>>> println("A streaming query was terminated successfully") >>>>>>>>> spark.streams.resetTerminated() >>>>>>>>> } >>>>>>>>> case Failure(e) => >>>>>>>>> println("Query failed with message: " + e.getMessage) >>>>>>>>> e.printStackTrace() >>>>>>>>> spark.streams.resetTerminated() >>>>>>>>> } >>>>>>>>> >>>>>>>>> if (checkMarker(shutdownMarker)) { >>>>>>>>> spark.streams.active.foreach(query => { >>>>>>>>> println(s"Stopping streaming query: ${query.id}") >>>>>>>>> query.stop() >>>>>>>>> }) >>>>>>>>> spark.stop() >>>>>>>>> removeMarker(shutdownMarker) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> assert(spark.streams.active.isEmpty) >>>>>>>>> spark.streams.removeListener(sparkQueryListener) >>>>>>>>> } >>>>>>>>> >>>>>>>>> def checkMarker(markerFile: String): Boolean = { >>>>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>>>> fs.exists(new Path(markerFile)) >>>>>>>>> } >>>>>>>>> >>>>>>>>> def removeMarker(markerFile: String): Unit = { >>>>>>>>> val fs = FileSystem.get(new Configuration()) >>>>>>>>> fs.delete(new Path(markerFile), true) >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> case class MyState(var dateTime: String, var inProgress: Boolean = >>>>>>>>> true, var count: Int = 1) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> package com.myorg >>>>>>>>> >>>>>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener >>>>>>>>> >>>>>>>>> class QueryListener extends StreamingQueryListener { >>>>>>>>> >>>>>>>>> def onQueryStarted(event: >>>>>>>>> StreamingQueryListener.QueryStartedEvent): Unit = {} >>>>>>>>> >>>>>>>>> def onQueryProgress(event: >>>>>>>>> StreamingQueryListener.QueryProgressEvent): Unit = { >>>>>>>>> if (event.progress.numInputRows != 0) { >>>>>>>>> println( >>>>>>>>> s"InputRows: ${event.progress.numInputRows}" >>>>>>>>> ) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> def onQueryTerminated(event: >>>>>>>>> StreamingQueryListener.QueryTerminatedEvent): Unit = { >>>>>>>>> println(s"Query with id ${event.id} terminated") >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>>>>> http://maven.apache.org/maven-v4_0_0.xsd"> >>>>>>>>> <modelVersion>4.0.0</modelVersion> >>>>>>>>> <groupId>com.myorg</groupId> >>>>>>>>> <artifactId>spark-3-conversion</artifactId> >>>>>>>>> <packaging>jar</packaging> >>>>>>>>> <version>1.0-SNAPSHOT</version> >>>>>>>>> <name>spark-3-conversion</name> >>>>>>>>> <url>http://maven.apache.org</url> >>>>>>>>> >>>>>>>>> <properties> >>>>>>>>> <spark.version>3.0.0</spark.version> >>>>>>>>> <scala.binary.version>2.12</scala.binary.version> >>>>>>>>> <scala.version>2.12.10</scala.version> >>>>>>>>> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >>>>>>>>> <skipTests>true</skipTests> >>>>>>>>> <maven.compiler.source>1.5</maven.compiler.source> >>>>>>>>> <maven.compiler.target>1.5</maven.compiler.target> >>>>>>>>> <encoding>UTF-8</encoding> >>>>>>>>> </properties> >>>>>>>>> >>>>>>>>> >>>>>>>>> <dependencies> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.scala-lang</groupId> >>>>>>>>> <artifactId>scala-library</artifactId> >>>>>>>>> <version>${scala.version}</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> <artifactId>spark-core_${scala.binary.version}</artifactId> >>>>>>>>> <version>${spark.version}</version> >>>>>>>>> <scope>provided</scope> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >>>>>>>>> <version>${spark.version}</version> >>>>>>>>> <scope>provided</scope> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> <artifactId>spark-sql_${scala.binary.version}</artifactId> >>>>>>>>> <version>${spark.version}</version> >>>>>>>>> <scope>provided</scope> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> >>>>>>>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>> <version>${spark.version}</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> >>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>> <version>${spark.version}</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.slf4j</groupId> >>>>>>>>> <artifactId>slf4j-log4j12</artifactId> >>>>>>>>> <version>1.7.7</version> >>>>>>>>> <scope>runtime</scope> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>log4j</groupId> >>>>>>>>> <artifactId>log4j</artifactId> >>>>>>>>> <version>1.2.17</version> >>>>>>>>> <scope>compile</scope> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.scalatest</groupId> >>>>>>>>> <artifactId>scalatest_${scala.binary.version}</artifactId> >>>>>>>>> <version>3.0.7</version> >>>>>>>>> <scope>test</scope> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>junit</groupId> >>>>>>>>> <artifactId>junit</artifactId> >>>>>>>>> <version>3.8.1</version> >>>>>>>>> <scope>test</scope> >>>>>>>>> </dependency> >>>>>>>>> </dependencies> >>>>>>>>> >>>>>>>>> <build> >>>>>>>>> <plugins> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>>> <artifactId>maven-shade-plugin</artifactId> >>>>>>>>> <version>3.0.0</version> >>>>>>>>> <executions> >>>>>>>>> <!-- Run shade goal on package phase --> >>>>>>>>> <execution> >>>>>>>>> <phase>install</phase> >>>>>>>>> <goals> >>>>>>>>> <goal>shade</goal> >>>>>>>>> </goals> >>>>>>>>> </execution> >>>>>>>>> </executions> >>>>>>>>> </plugin> >>>>>>>>> >>>>>>>>> <!-- Scala Compiler --> >>>>>>>>> <plugin> >>>>>>>>> <groupId>net.alchim31.maven</groupId> >>>>>>>>> <artifactId>scala-maven-plugin</artifactId> >>>>>>>>> <version>3.2.2</version> >>>>>>>>> <executions> >>>>>>>>> <execution> >>>>>>>>> <goals> >>>>>>>>> <goal>compile</goal> >>>>>>>>> <goal>testCompile</goal> >>>>>>>>> </goals> >>>>>>>>> </execution> >>>>>>>>> </executions> >>>>>>>>> </plugin> >>>>>>>>> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.codehaus.mojo</groupId> >>>>>>>>> <artifactId>build-helper-maven-plugin</artifactId> >>>>>>>>> <version>1.7</version> >>>>>>>>> <executions> >>>>>>>>> <!-- Add src/main/scala to eclipse build path --> >>>>>>>>> <execution> >>>>>>>>> <id>add-source</id> >>>>>>>>> <phase>generate-sources</phase> >>>>>>>>> <goals> >>>>>>>>> <goal>add-source</goal> >>>>>>>>> </goals> >>>>>>>>> <configuration> >>>>>>>>> <sources> >>>>>>>>> <source>src/main/scala</source> >>>>>>>>> </sources> >>>>>>>>> </configuration> >>>>>>>>> </execution> >>>>>>>>> <!-- Add src/test/scala to eclipse build path --> >>>>>>>>> <execution> >>>>>>>>> <id>add-test-source</id> >>>>>>>>> <phase>generate-test-sources</phase> >>>>>>>>> <goals> >>>>>>>>> <goal>add-test-source</goal> >>>>>>>>> </goals> >>>>>>>>> <configuration> >>>>>>>>> <sources> >>>>>>>>> <source>src/test/scala</source> >>>>>>>>> </sources> >>>>>>>>> </configuration> >>>>>>>>> </execution> >>>>>>>>> </executions> >>>>>>>>> </plugin> >>>>>>>>> >>>>>>>>> <!-- we disable surefile and enable scalatest so that maven can >>>>>>>>> run our tests --> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>>> <artifactId>maven-surefire-plugin</artifactId> >>>>>>>>> <version>2.7</version> >>>>>>>>> <configuration> >>>>>>>>> <skipTests>true</skipTests> >>>>>>>>> </configuration> >>>>>>>>> </plugin> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.scalatest</groupId> >>>>>>>>> <artifactId>scalatest-maven-plugin</artifactId> >>>>>>>>> <version>1.0</version> >>>>>>>>> <configuration> >>>>>>>>> >>>>>>>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >>>>>>>>> <junitxml>.</junitxml> >>>>>>>>> <filereports>WDF TestSuite.txt</filereports> >>>>>>>>> </configuration> >>>>>>>>> <executions> >>>>>>>>> <execution> >>>>>>>>> <id>test</id> >>>>>>>>> <goals> >>>>>>>>> <goal>test</goal> >>>>>>>>> </goals> >>>>>>>>> </execution> >>>>>>>>> </executions> >>>>>>>>> </plugin> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.scalastyle</groupId> >>>>>>>>> <artifactId>scalastyle-maven-plugin</artifactId> >>>>>>>>> <version>1.0.0</version> >>>>>>>>> <configuration> >>>>>>>>> <verbose>false</verbose> >>>>>>>>> <failOnViolation>true</failOnViolation> >>>>>>>>> >>>>>>>>> <includeTestSourceDirectory>true</includeTestSourceDirectory> >>>>>>>>> <failOnWarning>false</failOnWarning> >>>>>>>>> >>>>>>>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >>>>>>>>> >>>>>>>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >>>>>>>>> <configLocation>lib/scalastyle_config.xml</configLocation> >>>>>>>>> >>>>>>>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >>>>>>>>> <outputEncoding>UTF-8</outputEncoding> >>>>>>>>> </configuration> >>>>>>>>> <executions> >>>>>>>>> <execution> >>>>>>>>> <goals> >>>>>>>>> <goal>check</goal> >>>>>>>>> </goals> >>>>>>>>> </execution> >>>>>>>>> </executions> >>>>>>>>> </plugin> >>>>>>>>> <plugin> >>>>>>>>> <groupId>org.sonarsource.scanner.maven</groupId> >>>>>>>>> <artifactId>sonar-maven-plugin</artifactId> >>>>>>>>> <version>3.6.0.1398</version> >>>>>>>>> </plugin> >>>>>>>>> >>>>>>>>> </plugins> >>>>>>>>> </build> >>>>>>>>> </project> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes < >>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Ok. I will work on creating a reproducible app. Thanks. >>>>>>>>>> >>>>>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Just reached this thread. +1 on to create a simple reproducer >>>>>>>>>>> app and I suggest to create a jira attaching the full driver and >>>>>>>>>>> executor >>>>>>>>>>> logs. >>>>>>>>>>> Ping me on the jira and I'll pick this up right away... >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Would you mind if I ask for a simple reproducer? Would be nice >>>>>>>>>>>> if you could create a repository in Github and push the code >>>>>>>>>>>> including the >>>>>>>>>>>> build script. >>>>>>>>>>>> >>>>>>>>>>>> Thanks in advance! >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes < >>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I tried both. First tried 3.0.0. That didn't work so I >>>>>>>>>>>>> tried 3.1.0. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>>>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Which exact Spark version did you use? Did you make sure the >>>>>>>>>>>>>> version for Spark and the version for spark-sql-kafka artifact >>>>>>>>>>>>>> are the >>>>>>>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but >>>>>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data >>>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode. === >>>>>>>>>>>>>>> Streaming Query >>>>>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, >>>>>>>>>>>>>>> runId = >>>>>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed >>>>>>>>>>>>>>> Offsets: {} Current >>>>>>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>>>>>>>>> RUNNABLE at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 >>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>> sinks does not support Update mode. at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Please see the attached image for more information.* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski < >>>>>>>>>>>>>>> ja...@japila.pl> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Can you post the whole message? I'm trying to find what >>>>>>>>>>>>>>>> might be causing it. A small reproducible example would be of >>>>>>>>>>>>>>>> help too. >>>>>>>>>>>>>>>> Thank you. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Pozdrawiam, >>>>>>>>>>>>>>>> Jacek Laskowski >>>>>>>>>>>>>>>> ---- >>>>>>>>>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency >>>>>>>>>>>>>>>>> given below: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <dependency> >>>>>>>>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>>>>>>>>> <version>3.1.0</version> >>>>>>>>>>>>>>>>> </dependency> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .writeStream >>>>>>>>>>>>>>>>> .format("kafka") >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What am I missing? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>>