Hi, This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and regarding the repo, I believe just commit it to your personal repo and that should be it.
Regards On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com> wrote: > Sorry. Can you please tell me where to create the JIRA? Also is there any > specific Github repository I need to commit code into - OR - just in our > own? Please let me know. Thanks. > > On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Thanks you, as we've asked could you please create a jira and commit the >> code into github? >> It would speed things up a lot. >> >> G >> >> >> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com> >> wrote: >> >>> Here's a very simple reproducer app. I've attached 3 files: >>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the >>> email as well: >>> >>> package com.myorg >>> >>> import org.apache.hadoop.conf.Configuration >>> import org.apache.hadoop.fs.{FileSystem, Path} >>> import org.apache.hadoop.security.UserGroupInformation >>> import org.apache.kafka.clients.producer.ProducerConfig >>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >>> >>> import scala.util.{Failure, Success, Try} >>> >>> object Spark3Test { >>> >>> val isLocal = false >>> >>> implicit val stringEncoder: Encoder[String] = Encoders.STRING >>> implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] >>> >>> val START_DATE_INDEX = 21 >>> val END_DATE_INDEX = 40 >>> >>> def main(args: Array[String]) { >>> >>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", >>> isLocal) >>> spark.sparkContext.setLogLevel("WARN") >>> >>> readKafkaStream(spark) >>> .groupByKey(row => { >>> row.substring(START_DATE_INDEX, END_DATE_INDEX) >>> }) >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> .filter(row => !row.inProgress) >>> .map(row => "key: " + row.dateTime + " " + "count: " + row.count) >>> .writeStream >>> .format("kafka") >>> .option( >>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >>> "10.29.42.141:9092" >>> // "localhost:9092" >>> ) >>> .option("topic", "spark3test") >>> .option("checkpointLocation", "/tmp/checkpoint_5") >>> .outputMode("update") >>> .start() >>> manageStreamingQueries(spark) >>> } >>> >>> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >>> >>> val stream = sparkSession.readStream >>> .format("kafka") >>> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >>> .option("subscribe", "inputTopic") >>> .option("startingOffsets", "latest") >>> .option("failOnDataLoss", "false") >>> .option("kafkaConsumer.pollTimeoutMs", "120000") >>> .load() >>> .selectExpr("CAST(value AS STRING)") >>> .as[String](Encoders.STRING) >>> stream >>> } >>> >>> def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: >>> GroupState[MyState]): MyState = { >>> if (!oldState.exists) { >>> println(key) >>> val state = MyState(key) >>> oldState.update(state) >>> oldState.setTimeoutDuration("1 minutes") >>> oldState.get >>> } else { >>> if (oldState.hasTimedOut) { >>> oldState.get.inProgress = false >>> val state = oldState.get >>> println("State timed out for key: " + state.dateTime) >>> oldState.remove() >>> state >>> } else { >>> val state = oldState.get >>> state.count = state.count + 1 >>> oldState.update(state) >>> oldState.setTimeoutDuration("1 minutes") >>> oldState.get >>> } >>> } >>> } >>> >>> def initializeSparkSession(applicationName: String, isLocal: Boolean): >>> SparkSession = { >>> UserGroupInformation.setLoginUser( >>> UserGroupInformation.createRemoteUser("hduser") >>> ) >>> >>> val builder = SparkSession >>> .builder() >>> .appName(applicationName) >>> >>> if (isLocal) { >>> builder.config("spark.master", "local[2]") >>> } >>> >>> builder.getOrCreate() >>> } >>> >>> def manageStreamingQueries(spark: SparkSession): Unit = { >>> >>> val sparkQueryListener = new QueryListener() >>> spark.streams.addListener(sparkQueryListener) >>> >>> val shutdownMarker: String = "/tmp/stop_job" >>> >>> val timeoutInMilliSeconds = 60000 >>> while (!spark.streams.active.isEmpty) { >>> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match { >>> case Success(result) => >>> if (result) { >>> println("A streaming query was terminated successfully") >>> spark.streams.resetTerminated() >>> } >>> case Failure(e) => >>> println("Query failed with message: " + e.getMessage) >>> e.printStackTrace() >>> spark.streams.resetTerminated() >>> } >>> >>> if (checkMarker(shutdownMarker)) { >>> spark.streams.active.foreach(query => { >>> println(s"Stopping streaming query: ${query.id}") >>> query.stop() >>> }) >>> spark.stop() >>> removeMarker(shutdownMarker) >>> } >>> } >>> assert(spark.streams.active.isEmpty) >>> spark.streams.removeListener(sparkQueryListener) >>> } >>> >>> def checkMarker(markerFile: String): Boolean = { >>> val fs = FileSystem.get(new Configuration()) >>> fs.exists(new Path(markerFile)) >>> } >>> >>> def removeMarker(markerFile: String): Unit = { >>> val fs = FileSystem.get(new Configuration()) >>> fs.delete(new Path(markerFile), true) >>> } >>> >>> } >>> >>> case class MyState(var dateTime: String, var inProgress: Boolean = true, >>> var count: Int = 1) >>> >>> >>> >>> >>> >>> >>> >>> package com.myorg >>> >>> import org.apache.spark.sql.streaming.StreamingQueryListener >>> >>> class QueryListener extends StreamingQueryListener { >>> >>> def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit >>> = {} >>> >>> def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): >>> Unit = { >>> if (event.progress.numInputRows != 0) { >>> println( >>> s"InputRows: ${event.progress.numInputRows}" >>> ) >>> } >>> } >>> >>> def onQueryTerminated(event: >>> StreamingQueryListener.QueryTerminatedEvent): Unit = { >>> println(s"Query with id ${event.id} terminated") >>> } >>> } >>> >>> >>> >>> >>> >>> >>> >>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>> http://maven.apache.org/maven-v4_0_0.xsd"> >>> <modelVersion>4.0.0</modelVersion> >>> <groupId>com.myorg</groupId> >>> <artifactId>spark-3-conversion</artifactId> >>> <packaging>jar</packaging> >>> <version>1.0-SNAPSHOT</version> >>> <name>spark-3-conversion</name> >>> <url>http://maven.apache.org</url> >>> >>> <properties> >>> <spark.version>3.0.0</spark.version> >>> <scala.binary.version>2.12</scala.binary.version> >>> <scala.version>2.12.10</scala.version> >>> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >>> <skipTests>true</skipTests> >>> <maven.compiler.source>1.5</maven.compiler.source> >>> <maven.compiler.target>1.5</maven.compiler.target> >>> <encoding>UTF-8</encoding> >>> </properties> >>> >>> >>> <dependencies> >>> <dependency> >>> <groupId>org.scala-lang</groupId> >>> <artifactId>scala-library</artifactId> >>> <version>${scala.version}</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-core_${scala.binary.version}</artifactId> >>> <version>${spark.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >>> <version>${spark.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql_${scala.binary.version}</artifactId> >>> <version>${spark.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> >>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >>> <version>${spark.version}</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>> <version>${spark.version}</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.slf4j</groupId> >>> <artifactId>slf4j-log4j12</artifactId> >>> <version>1.7.7</version> >>> <scope>runtime</scope> >>> </dependency> >>> <dependency> >>> <groupId>log4j</groupId> >>> <artifactId>log4j</artifactId> >>> <version>1.2.17</version> >>> <scope>compile</scope> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.scalatest</groupId> >>> <artifactId>scalatest_${scala.binary.version}</artifactId> >>> <version>3.0.7</version> >>> <scope>test</scope> >>> </dependency> >>> >>> <dependency> >>> <groupId>junit</groupId> >>> <artifactId>junit</artifactId> >>> <version>3.8.1</version> >>> <scope>test</scope> >>> </dependency> >>> </dependencies> >>> >>> <build> >>> <plugins> >>> <plugin> >>> <groupId>org.apache.maven.plugins</groupId> >>> <artifactId>maven-shade-plugin</artifactId> >>> <version>3.0.0</version> >>> <executions> >>> <!-- Run shade goal on package phase --> >>> <execution> >>> <phase>install</phase> >>> <goals> >>> <goal>shade</goal> >>> </goals> >>> </execution> >>> </executions> >>> </plugin> >>> >>> <!-- Scala Compiler --> >>> <plugin> >>> <groupId>net.alchim31.maven</groupId> >>> <artifactId>scala-maven-plugin</artifactId> >>> <version>3.2.2</version> >>> <executions> >>> <execution> >>> <goals> >>> <goal>compile</goal> >>> <goal>testCompile</goal> >>> </goals> >>> </execution> >>> </executions> >>> </plugin> >>> >>> <plugin> >>> <groupId>org.codehaus.mojo</groupId> >>> <artifactId>build-helper-maven-plugin</artifactId> >>> <version>1.7</version> >>> <executions> >>> <!-- Add src/main/scala to eclipse build path --> >>> <execution> >>> <id>add-source</id> >>> <phase>generate-sources</phase> >>> <goals> >>> <goal>add-source</goal> >>> </goals> >>> <configuration> >>> <sources> >>> <source>src/main/scala</source> >>> </sources> >>> </configuration> >>> </execution> >>> <!-- Add src/test/scala to eclipse build path --> >>> <execution> >>> <id>add-test-source</id> >>> <phase>generate-test-sources</phase> >>> <goals> >>> <goal>add-test-source</goal> >>> </goals> >>> <configuration> >>> <sources> >>> <source>src/test/scala</source> >>> </sources> >>> </configuration> >>> </execution> >>> </executions> >>> </plugin> >>> >>> <!-- we disable surefile and enable scalatest so that maven can run >>> our tests --> >>> <plugin> >>> <groupId>org.apache.maven.plugins</groupId> >>> <artifactId>maven-surefire-plugin</artifactId> >>> <version>2.7</version> >>> <configuration> >>> <skipTests>true</skipTests> >>> </configuration> >>> </plugin> >>> <plugin> >>> <groupId>org.scalatest</groupId> >>> <artifactId>scalatest-maven-plugin</artifactId> >>> <version>1.0</version> >>> <configuration> >>> >>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >>> <junitxml>.</junitxml> >>> <filereports>WDF TestSuite.txt</filereports> >>> </configuration> >>> <executions> >>> <execution> >>> <id>test</id> >>> <goals> >>> <goal>test</goal> >>> </goals> >>> </execution> >>> </executions> >>> </plugin> >>> <plugin> >>> <groupId>org.scalastyle</groupId> >>> <artifactId>scalastyle-maven-plugin</artifactId> >>> <version>1.0.0</version> >>> <configuration> >>> <verbose>false</verbose> >>> <failOnViolation>true</failOnViolation> >>> <includeTestSourceDirectory>true</includeTestSourceDirectory> >>> <failOnWarning>false</failOnWarning> >>> >>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >>> >>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >>> <configLocation>lib/scalastyle_config.xml</configLocation> >>> >>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >>> <outputEncoding>UTF-8</outputEncoding> >>> </configuration> >>> <executions> >>> <execution> >>> <goals> >>> <goal>check</goal> >>> </goals> >>> </execution> >>> </executions> >>> </plugin> >>> <plugin> >>> <groupId>org.sonarsource.scanner.maven</groupId> >>> <artifactId>sonar-maven-plugin</artifactId> >>> <version>3.6.0.1398</version> >>> </plugin> >>> >>> </plugins> >>> </build> >>> </project> >>> >>> >>> >>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com> >>> wrote: >>> >>>> Ok. I will work on creating a reproducible app. Thanks. >>>> >>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> Just reached this thread. +1 on to create a simple reproducer app and >>>>> I suggest to create a jira attaching the full driver and executor logs. >>>>> Ping me on the jira and I'll pick this up right away... >>>>> >>>>> Thanks! >>>>> >>>>> G >>>>> >>>>> >>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>>> kabhwan.opensou...@gmail.com> wrote: >>>>> >>>>>> Would you mind if I ask for a simple reproducer? Would be nice if you >>>>>> could create a repository in Github and push the code including the build >>>>>> script. >>>>>> >>>>>> Thanks in advance! >>>>>> >>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglist...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>>>>>> >>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>> >>>>>>>> Which exact Spark version did you use? Did you make sure the >>>>>>>> version for Spark and the version for spark-sql-kafka artifact are the >>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but >>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.) >>>>>>>> >>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data >>>>>>>>> source v2 streaming sinks does not support Update mode. === Streaming >>>>>>>>> Query >>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = >>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} >>>>>>>>> Current >>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>>> RUNNABLE at >>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>>> at >>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 >>>>>>>>> streaming >>>>>>>>> sinks does not support Update mode. at >>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>>> at >>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>>> at >>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>>> at >>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>>> ... 1 more >>>>>>>>> >>>>>>>>> >>>>>>>>> *Please see the attached image for more information.* >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Can you post the whole message? I'm trying to find what might be >>>>>>>>>> causing it. A small reproducible example would be of help too. Thank >>>>>>>>>> you. >>>>>>>>>> >>>>>>>>>> Pozdrawiam, >>>>>>>>>> Jacek Laskowski >>>>>>>>>> ---- >>>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>>> >>>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency given >>>>>>>>>>> below: >>>>>>>>>>> >>>>>>>>>>> <dependency> >>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>> >>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>>> <version>3.1.0</version> >>>>>>>>>>> </dependency> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>>> >>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>>> >>>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>>> >>>>>>>>>>> .writeStream >>>>>>>>>>> .format("kafka") >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> What am I missing? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>> >>>>>>>>