> Sorry. Can you please tell me where to create the JIRA? Also is there any
> specific Github repository I need to commit code into - OR - just in our
> own? Please let me know. Thanks.
>> Thanks you, as we've asked could you please create a jira and commit the
>> code into github?
>> It would speed things up a lot.
>> G
>>> Here's a very simple reproducer app. I've attached 3 files:
>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>> email as well:
>>> package com.myorg
>>> import org.apache.hadoop.conf.Configuration
>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>> import org.apache.hadoop.security.UserGroupInformation
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>> import scala.util.{Failure, Success, Try}
>>> object Spark3Test {
>>>   val isLocal = false
>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>   val START_DATE_INDEX = 21
>>>   val END_DATE_INDEX = 40
>>>   def main(args: Array[String]) {
>>>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>>> isLocal)
>>>     spark.sparkContext.setLogLevel("WARN")
>>>     readKafkaStream(spark)
>>>       .groupByKey(row => {
>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>       })
>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>         updateAcrossEvents
>>>       )
>>>       .filter(row => !row.inProgress)
>>>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>       .writeStream
>>>       .format("kafka")
>>>       .option(
>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>         ""
>>> //        "localhost:9092"
>>>       )
>>>       .option("topic", "spark3test")
>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>       .outputMode("update")
>>>       .start()
>>>     manageStreamingQueries(spark)
>>>   }
>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>     val stream = sparkSession.readStream
>>>       .format("kafka")
>>>       .option("kafka.bootstrap.servers", "")
>>>       .option("subscribe", "inputTopic")
>>>       .option("startingOffsets", "latest")
>>>       .option("failOnDataLoss", "false")
>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>       .load()
>>>       .selectExpr("CAST(value AS STRING)")
>>>       .as[String](Encoders.STRING)
>>>     stream
>>>   }
>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>>> GroupState[MyState]): MyState = {
>>>     if (!oldState.exists) {
>>>       println(key)
>>>       val state = MyState(key)
>>>       oldState.update(state)
>>>       oldState.setTimeoutDuration("1 minutes")
>>>       oldState.get
>>>     } else {
>>>       if (oldState.hasTimedOut) {
>>>         oldState.get.inProgress = false
>>>         val state = oldState.get
>>>         println("State timed out for key: " + state.dateTime)
>>>         oldState.remove()
>>>         state
>>>       } else {
>>>         val state = oldState.get
>>>         state.count = state.count + 1
>>>         oldState.update(state)
>>>         oldState.setTimeoutDuration("1 minutes")
>>>         oldState.get
>>>       }
>>>     }
>>>   }
>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>>> SparkSession = {
>>>     UserGroupInformation.setLoginUser(
>>>       UserGroupInformation.createRemoteUser("hduser")
>>>     )
>>>     val builder = SparkSession
>>>       .builder()
>>>       .appName(applicationName)
>>>     if (isLocal) {
>>>       builder.config("spark.master", "local[2]")
>>>     }
>>>     builder.getOrCreate()
>>>   }
>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>     val sparkQueryListener = new QueryListener()
>>>     spark.streams.addListener(sparkQueryListener)
>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>     val timeoutInMilliSeconds = 60000
>>>     while (!spark.streams.active.isEmpty) {
>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>>>         case Success(result) =>
>>>           if (result) {
>>>             println("A streaming query was terminated successfully")
>>>             spark.streams.resetTerminated()
>>>           }
>>>         case Failure(e) =>
>>>           println("Query failed with message: " + e.getMessage)
>>>           e.printStackTrace()
>>>           spark.streams.resetTerminated()
>>>       }
>>>       if (checkMarker(shutdownMarker)) {
>>>         spark.streams.active.foreach(query => {
>>>           println(s"Stopping streaming query: ${query.id}")
>>>           query.stop()
>>>         })
>>>         spark.stop()
>>>         removeMarker(shutdownMarker)
>>>       }
>>>     }
>>>     assert(spark.streams.active.isEmpty)
>>>     spark.streams.removeListener(sparkQueryListener)
>>>   }
>>>   def checkMarker(markerFile: String): Boolean = {
>>>     val fs = FileSystem.get(new Configuration())
>>>     fs.exists(new Path(markerFile))
>>>   }
>>>   def removeMarker(markerFile: String): Unit = {
>>>     val fs = FileSystem.get(new Configuration())
>>>     fs.delete(new Path(markerFile), true)
>>>   }
>>> }
>>> case class MyState(var dateTime: String, var inProgress: Boolean = true, 
>>> var count: Int = 1)
>>> package com.myorg
>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>> class QueryListener extends StreamingQueryListener {
>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit 
>>> = {}
>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): 
>>> Unit = {
>>>     if (event.progress.numInputRows != 0) {
>>>       println(
>>>         s"InputRows: ${event.progress.numInputRows}"
>>>       )
>>>     }
>>>   }
>>>   def onQueryTerminated(event: 
>>> StreamingQueryListener.QueryTerminatedEvent): Unit = {
>>>     println(s"Query with id ${event.id} terminated")
>>>   }
>>> }
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/maven-v4_0_0.xsd";>
>>>   <modelVersion>4.0.0</modelVersion>
>>>   <groupId>com.myorg</groupId>
>>>   <artifactId>spark-3-conversion</artifactId>
>>>   <packaging>jar</packaging>
>>>   <version>1.0-SNAPSHOT</version>
>>>   <name>spark-3-conversion</name>
>>>   <url>http://maven.apache.org</url>
>>>   <properties>
>>>     <spark.version>3.0.0</spark.version>
>>>     <scala.binary.version>2.12</scala.binary.version>
>>>     <scala.version>2.12.10</scala.version>
>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>     <skipTests>true</skipTests>
>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>     <encoding>UTF-8</encoding>
>>>   </properties>
>>>   <dependencies>
>>>     <dependency>
>>>       <groupId>org.scala-lang</groupId>
>>>       <artifactId>scala-library</artifactId>
>>>       <version>${scala.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>       <version>${spark.version}</version>
>>>       <scope>provided</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>       <version>${spark.version}</version>
>>>       <scope>provided</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>       <version>${spark.version}</version>
>>>       <scope>provided</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>       <version>${spark.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>       <version>${spark.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.slf4j</groupId>
>>>       <artifactId>slf4j-log4j12</artifactId>
>>>       <version>1.7.7</version>
>>>       <scope>runtime</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>log4j</groupId>
>>>       <artifactId>log4j</artifactId>
>>>       <version>1.2.17</version>
>>>       <scope>compile</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.scalatest</groupId>
>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>       <version>3.0.7</version>
>>>       <scope>test</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>junit</groupId>
>>>       <artifactId>junit</artifactId>
>>>       <version>3.8.1</version>
>>>       <scope>test</scope>
>>>     </dependency>
>>>   </dependencies>
>>>   <build>
>>>     <plugins>
>>>       <plugin>
>>>         <groupId>org.apache.maven.plugins</groupId>
>>>         <artifactId>maven-shade-plugin</artifactId>
>>>         <version>3.0.0</version>
>>>         <executions>
>>>           <!-- Run shade goal on package phase -->
>>>           <execution>
>>>             <phase>install</phase>
>>>             <goals>
>>>               <goal>shade</goal>
>>>             </goals>
>>>           </execution>
>>>         </executions>
>>>       </plugin>
>>>       <!-- Scala Compiler -->
>>>       <plugin>
>>>         <groupId>net.alchim31.maven</groupId>
>>>         <artifactId>scala-maven-plugin</artifactId>
>>>         <version>3.2.2</version>
>>>         <executions>
>>>           <execution>
>>>             <goals>
>>>               <goal>compile</goal>
>>>               <goal>testCompile</goal>
>>>             </goals>
>>>           </execution>
>>>         </executions>
>>>       </plugin>
>>>       <plugin>
>>>         <groupId>org.codehaus.mojo</groupId>
>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>         <version>1.7</version>
>>>         <executions>
>>>           <!-- Add src/main/scala to eclipse build path -->
>>>           <execution>
>>>             <id>add-source</id>
>>>             <phase>generate-sources</phase>
>>>             <goals>
>>>               <goal>add-source</goal>
>>>             </goals>
>>>             <configuration>
>>>               <sources>
>>>                 <source>src/main/scala</source>
>>>               </sources>
>>>             </configuration>
>>>           </execution>
>>>           <!-- Add src/test/scala to eclipse build path -->
>>>           <execution>
>>>             <id>add-test-source</id>
>>>             <phase>generate-test-sources</phase>
>>>             <goals>
>>>               <goal>add-test-source</goal>
>>>             </goals>
>>>             <configuration>
>>>               <sources>
>>>                 <source>src/test/scala</source>
>>>               </sources>
>>>             </configuration>
>>>           </execution>
>>>         </executions>
>>>       </plugin>
>>>       <!-- we disable surefile and enable scalatest so that maven can run 
>>> our tests -->
>>>       <plugin>
>>>         <groupId>org.apache.maven.plugins</groupId>
>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>         <version>2.7</version>
>>>         <configuration>
>>>           <skipTests>true</skipTests>
>>>         </configuration>
>>>       </plugin>
>>>       <plugin>
>>>         <groupId>org.scalatest</groupId>
>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>         <version>1.0</version>
>>>         <configuration>
>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>           <junitxml>.</junitxml>
>>>           <filereports>WDF TestSuite.txt</filereports>
>>>         </configuration>
>>>         <executions>
>>>           <execution>
>>>             <id>test</id>
>>>             <goals>
>>>               <goal>test</goal>
>>>             </goals>
>>>           </execution>
>>>         </executions>
>>>       </plugin>
>>>       <plugin>
>>>         <groupId>org.scalastyle</groupId>
>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>         <version>1.0.0</version>
>>>         <configuration>
>>>           <verbose>false</verbose>
>>>           <failOnViolation>true</failOnViolation>
>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>           <failOnWarning>false</failOnWarning>
>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>           <outputEncoding>UTF-8</outputEncoding>
>>>         </configuration>
>>>         <executions>
>>>           <execution>
>>>             <goals>
>>>               <goal>check</goal>
>>>             </goals>
>>>           </execution>
>>>         </executions>
>>>       </plugin>
>>>       <plugin>
>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>         <version></version>
>>>       </plugin>
>>>     </plugins>
>>>   </build>
>>> </project>
Ok. I will work on creating a reproducible app. Thanks.
>>>>> Just reached this thread. +1 on to create a simple reproducer app and
>>>>> I suggest to create a jira attaching the full driver and executor logs.
>>>>> Ping me on the jira and I'll pick this up right away...
>>>>> Thanks!
>>>>> G
>>>>>> Would you mind if I ask for a simple reproducer? Would be nice if you
>>>>>> could create a repository in Github and push the code including the build
>>>>>> script.
>>>>>> Thanks in advance!
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>>>>>> Which exact Spark version did you use? Did you make sure the
>>>>>>>> version for Spark and the version for spark-sql-kafka artifact are the
>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but
>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.)
>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data
>>>>>>>>> source v2 streaming sinks does not support Update mode. === Streaming 
>>>>>>>>> Query
>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} 
>>>>>>>>> Current
>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: 
>>>>>>>>> RUNNABLE at
>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 
>>>>>>>>> streaming
>>>>>>>>> sinks does not support Update mode. at
>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>> at 
>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>> ... 1 more
>>>>>>>>> *Please see the attached image for more information.*
>>>>>>>>>> Hi,
>>>>>>>>>> Can you post the whole message? I'm trying to find what might be
>>>>>>>>>> causing it. A small reproducible example would be of help too. Thank 
>>>>>>>>>> you.
>>>>>>>>>> Pozdrawiam,
>>>>>>>>>> Jacek Laskowski
>>>>>>>>>> ----
>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming
>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency given 
>>>>>>>>>>> below:
>>>>>>>>>>> <dependency>
>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>> </dependency>
>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data
>>>>>>>>>>> source v2 streaming sinks does not support Update mode*
>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link (
>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>>>> .writeStream
>>>>>>>>>>> .format("kafka")
>>>>>>>>>>> What am I missing?
