Re: Data source v2 streaming sinks does not support Update mode

2021-01-19 Thread Eric Beabes
Will do, thanks!

On Tue, Jan 19, 2021 at 1:39 PM Gabor Somogyi 
wrote:

> Thanks for double checking the version. Please report back with 3.1
> version whether it works or not.
>
> G
>
>
> On Tue, 19 Jan 2021, 07:41 Eric Beabes,  wrote:
>
>> Confirmed. The cluster Admin said his team installed the latest version
>> from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
>> to upgrade it with the Community edition Spark 3.1.0.
>>
>> Thanks Jungtaek for the tip. Greatly appreciate it.
>>
>> On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes 
>> wrote:
>>
>>> >> "Could you please make sure you're not using "3.0.0-preview".
>>>
>>> This could be the reason. I will check with our Hadoop cluster
>>> administrator. It's quite possible that they installed the "Preview" mode.
>>> Yes, the code works in the Local dev environment.
>>>
>>>
>>> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 I see no issue from running this code in local dev. (changed the scope
 of Spark artifacts to "compile" of course)

 Could you please make sure you're not using "3.0.0-preview"? In
 3.0.0-preview update mode was restricted (as the error message says) and it
 was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
 .m2 cache may work.

 On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
 kabhwan.opensou...@gmail.com> wrote:

> And also include some test data as well. I quickly looked through the
> code and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
> gschiavonsp...@gmail.com> wrote:
>
>> Hi,
>>
>> This is the jira
>>  and
>> regarding the repo, I believe just commit it to your personal repo and 
>> that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is
>>> there any specific Github repository I need to commit code into - OR - 
>>> just
>>> in our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
>>> gabor.g.somo...@gmail.com> wrote:
>>>
 Thanks you, as we've asked could you please create a jira and
 commit the code into github?
 It would speed things up a lot.

 G


 On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <
 mailinglist...@gmail.com> wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in 
> the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = 
> Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
> val spark: SparkSession = initializeSparkSession("Spark 3.0 
> Upgrade", isLocal)
> spark.sparkContext.setLogLevel("WARN")
>
> readKafkaStream(spark)
>   .groupByKey(row => {
> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>   })
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>   .filter(row => !row.inProgress)
>   .map(row => "key: " + row.dateTime + " " + "count: " + 
> row.count)
>   .writeStream
>   .format("kafka")
>   .option(
> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
> "10.29.42.141:9092"
> //"localhost:9092"
>   )
>   .option("topic", "spark3test")
>   .option("checkpointLocation", "/tmp/checkpoint_5")
>   .outputMode("update")
>   .start()
> manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
> val stream = sparkSession.readStream
>   

Re: Data source v2 streaming sinks does not support Update mode

2021-01-19 Thread Gabor Somogyi
Thanks for double checking the version. Please report back with 3.1 version
whether it works or not.

G


On Tue, 19 Jan 2021, 07:41 Eric Beabes,  wrote:

> Confirmed. The cluster Admin said his team installed the latest version
> from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
> to upgrade it with the Community edition Spark 3.1.0.
>
> Thanks Jungtaek for the tip. Greatly appreciate it.
>
> On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes 
> wrote:
>
>> >> "Could you please make sure you're not using "3.0.0-preview".
>>
>> This could be the reason. I will check with our Hadoop cluster
>> administrator. It's quite possible that they installed the "Preview" mode.
>> Yes, the code works in the Local dev environment.
>>
>>
>> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> I see no issue from running this code in local dev. (changed the scope
>>> of Spark artifacts to "compile" of course)
>>>
>>> Could you please make sure you're not using "3.0.0-preview"? In
>>> 3.0.0-preview update mode was restricted (as the error message says) and it
>>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
>>> .m2 cache may work.
>>>
>>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 And also include some test data as well. I quickly looked through the
 code and the code may require a specific format of the record.

 On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
 gschiavonsp...@gmail.com> wrote:

> Hi,
>
> This is the jira
>  and regarding
> the repo, I believe just commit it to your personal repo and that should 
> be
> it.
>
> Regards
>
> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
> wrote:
>
>> Sorry. Can you please tell me where to create the JIRA? Also is there
>> any specific Github repository I need to commit code into - OR - just in
>> our own? Please let me know. Thanks.
>>
>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
>> gabor.g.somo...@gmail.com> wrote:
>>
>>> Thanks you, as we've asked could you please create a jira and commit
>>> the code into github?
>>> It would speed things up a lot.
>>>
>>> G
>>>
>>>
>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <
>>> mailinglist...@gmail.com> wrote:
>>>
 Here's a very simple reproducer app. I've attached 3 files:
 SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
 email as well:

 package com.myorg

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
 import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

 import scala.util.{Failure, Success, Try}

 object Spark3Test {

   val isLocal = false

   implicit val stringEncoder: Encoder[String] = Encoders.STRING
   implicit val myStateEncoder: Encoder[MyState] = 
 Encoders.kryo[MyState]

   val START_DATE_INDEX = 21
   val END_DATE_INDEX = 40

   def main(args: Array[String]) {

 val spark: SparkSession = initializeSparkSession("Spark 3.0 
 Upgrade", isLocal)
 spark.sparkContext.setLogLevel("WARN")

 readKafkaStream(spark)
   .groupByKey(row => {
 row.substring(START_DATE_INDEX, END_DATE_INDEX)
   })
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
 updateAcrossEvents
   )
   .filter(row => !row.inProgress)
   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
   .writeStream
   .format("kafka")
   .option(
 s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
 "10.29.42.141:9092"
 //"localhost:9092"
   )
   .option("topic", "spark3test")
   .option("checkpointLocation", "/tmp/checkpoint_5")
   .outputMode("update")
   .start()
 manageStreamingQueries(spark)
   }

   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

 val stream = sparkSession.readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
   .option("subscribe", "inputTopic")
   .option("startingOffsets", "latest")
   .option("failOnDataLoss", "false")

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Confirmed. The cluster Admin said his team installed the latest version
from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes 
wrote:

> >> "Could you please make sure you're not using "3.0.0-preview".
>
> This could be the reason. I will check with our Hadoop cluster
> administrator. It's quite possible that they installed the "Preview" mode.
> Yes, the code works in the Local dev environment.
>
>
> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim 
> wrote:
>
>> I see no issue from running this code in local dev. (changed the scope of
>> Spark artifacts to "compile" of course)
>>
>> Could you please make sure you're not using "3.0.0-preview"? In
>> 3.0.0-preview update mode was restricted (as the error message says) and it
>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
>> .m2 cache may work.
>>
>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> And also include some test data as well. I quickly looked through the
>>> code and the code may require a specific format of the record.
>>>
>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
 Hi,

 This is the jira
  and regarding
 the repo, I believe just commit it to your personal repo and that should be
 it.

 Regards

 On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
 wrote:

> Sorry. Can you please tell me where to create the JIRA? Also is there
> any specific Github repository I need to commit code into - OR - just in
> our own? Please let me know. Thanks.
>
> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com> wrote:
>
>> Thanks you, as we've asked could you please create a jira and commit
>> the code into github?
>> It would speed things up a lot.
>>
>> G
>>
>>
>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>> wrote:
>>
>>> Here's a very simple reproducer app. I've attached 3 files:
>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>> email as well:
>>>
>>> package com.myorg
>>>
>>> import org.apache.hadoop.conf.Configuration
>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>> import org.apache.hadoop.security.UserGroupInformation
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>
>>> import scala.util.{Failure, Success, Try}
>>>
>>> object Spark3Test {
>>>
>>>   val isLocal = false
>>>
>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>
>>>   val START_DATE_INDEX = 21
>>>   val END_DATE_INDEX = 40
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 
>>> Upgrade", isLocal)
>>> spark.sparkContext.setLogLevel("WARN")
>>>
>>> readKafkaStream(spark)
>>>   .groupByKey(row => {
>>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>   })
>>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>   .filter(row => !row.inProgress)
>>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>   .writeStream
>>>   .format("kafka")
>>>   .option(
>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>> "10.29.42.141:9092"
>>> //"localhost:9092"
>>>   )
>>>   .option("topic", "spark3test")
>>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>>   .outputMode("update")
>>>   .start()
>>> manageStreamingQueries(spark)
>>>   }
>>>
>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>
>>> val stream = sparkSession.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>   .option("subscribe", "inputTopic")
>>>   .option("startingOffsets", "latest")
>>>   .option("failOnDataLoss", "false")
>>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>>   .load()
>>>   .selectExpr("CAST(value AS STRING)")
>>>   .as[String](Encoders.STRING)
>>> stream
>>>   }
>>>
>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], 
>>> oldState: GroupState[MyState]): MyState = {
>>> if 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
>> "Could you please make sure you're not using "3.0.0-preview".

This could be the reason. I will check with our Hadoop cluster
administrator. It's quite possible that they installed the "Preview" mode.
Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim 
wrote:

> I see no issue from running this code in local dev. (changed the scope of
> Spark artifacts to "compile" of course)
>
> Could you please make sure you're not using "3.0.0-preview"? In
> 3.0.0-preview update mode was restricted (as the error message says) and it
> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
> .m2 cache may work.
>
> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim 
> wrote:
>
>> And also include some test data as well. I quickly looked through the
>> code and the code may require a specific format of the record.
>>
>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>> gschiavonsp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This is the jira  and
>>> regarding the repo, I believe just commit it to your personal repo and that
>>> should be it.
>>>
>>> Regards
>>>
>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>>> wrote:
>>>
 Sorry. Can you please tell me where to create the JIRA? Also is there
 any specific Github repository I need to commit code into - OR - just in
 our own? Please let me know. Thanks.

 On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> Thanks you, as we've asked could you please create a jira and commit
> the code into github?
> It would speed things up a lot.
>
> G
>
>
> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
> wrote:
>
>> Here's a very simple reproducer app. I've attached 3 files:
>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>> email as well:
>>
>> package com.myorg
>>
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.fs.{FileSystem, Path}
>> import org.apache.hadoop.security.UserGroupInformation
>> import org.apache.kafka.clients.producer.ProducerConfig
>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>
>> import scala.util.{Failure, Success, Try}
>>
>> object Spark3Test {
>>
>>   val isLocal = false
>>
>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>
>>   val START_DATE_INDEX = 21
>>   val END_DATE_INDEX = 40
>>
>>   def main(args: Array[String]) {
>>
>> val spark: SparkSession = initializeSparkSession("Spark 3.0 
>> Upgrade", isLocal)
>> spark.sparkContext.setLogLevel("WARN")
>>
>> readKafkaStream(spark)
>>   .groupByKey(row => {
>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>   })
>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>   .filter(row => !row.inProgress)
>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>   .writeStream
>>   .format("kafka")
>>   .option(
>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>> "10.29.42.141:9092"
>> //"localhost:9092"
>>   )
>>   .option("topic", "spark3test")
>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>   .outputMode("update")
>>   .start()
>> manageStreamingQueries(spark)
>>   }
>>
>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>
>> val stream = sparkSession.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>   .option("subscribe", "inputTopic")
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>   .load()
>>   .selectExpr("CAST(value AS STRING)")
>>   .as[String](Encoders.STRING)
>> stream
>>   }
>>
>>   def updateAcrossEvents(key: String, inputs: Iterator[String], 
>> oldState: GroupState[MyState]): MyState = {
>> if (!oldState.exists) {
>>   println(key)
>>   val state = MyState(key)
>>   oldState.update(state)
>>   oldState.setTimeoutDuration("1 minutes")
>>   oldState.get
>> } else {
>>   if (oldState.hasTimedOut) {
>> oldState.get.inProgress = false
>> val state = oldState.get
>> println("State timed out for key: " + state.dateTime)
>> oldState.remove()
>> state
>>  

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
I see no issue from running this code in local dev. (changed the scope of
Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In
3.0.0-preview update mode was restricted (as the error message says) and it
was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
.m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim 
wrote:

> And also include some test data as well. I quickly looked through the code
> and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon 
> wrote:
>
>> Hi,
>>
>> This is the jira  and
>> regarding the repo, I believe just commit it to your personal repo and that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>> any specific Github repository I need to commit code into - OR - just in
>>> our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
>>> wrote:
>>>
 Thanks you, as we've asked could you please create a jira and commit
 the code into github?
 It would speed things up a lot.

 G


 On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
 wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
> isLocal)
> spark.sparkContext.setLogLevel("WARN")
>
> readKafkaStream(spark)
>   .groupByKey(row => {
> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>   })
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>   .filter(row => !row.inProgress)
>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>   .writeStream
>   .format("kafka")
>   .option(
> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
> "10.29.42.141:9092"
> //"localhost:9092"
>   )
>   .option("topic", "spark3test")
>   .option("checkpointLocation", "/tmp/checkpoint_5")
>   .outputMode("update")
>   .start()
> manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
> val stream = sparkSession.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>   .option("subscribe", "inputTopic")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafkaConsumer.pollTimeoutMs", "12")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>   .as[String](Encoders.STRING)
> stream
>   }
>
>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
> GroupState[MyState]): MyState = {
> if (!oldState.exists) {
>   println(key)
>   val state = MyState(key)
>   oldState.update(state)
>   oldState.setTimeoutDuration("1 minutes")
>   oldState.get
> } else {
>   if (oldState.hasTimedOut) {
> oldState.get.inProgress = false
> val state = oldState.get
> println("State timed out for key: " + state.dateTime)
> oldState.remove()
> state
>   } else {
> val state = oldState.get
> state.count = state.count + 1
> oldState.update(state)
> oldState.setTimeoutDuration("1 minutes")
> oldState.get
>   }
> }
>   }
>
>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
> SparkSession = {
> UserGroupInformation.setLoginUser(
>   UserGroupInformation.createRemoteUser("hduser")
> )
>
> val builder 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
And also include some test data as well. I quickly looked through the code
and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon 
wrote:

> Hi,
>
> This is the jira  and
> regarding the repo, I believe just commit it to your personal repo and that
> should be it.
>
> Regards
>
> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
> wrote:
>
>> Sorry. Can you please tell me where to create the JIRA? Also is there any
>> specific Github repository I need to commit code into - OR - just in our
>> own? Please let me know. Thanks.
>>
>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
>> wrote:
>>
>>> Thanks you, as we've asked could you please create a jira and commit the
>>> code into github?
>>> It would speed things up a lot.
>>>
>>> G
>>>
>>>
>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>>> wrote:
>>>
 Here's a very simple reproducer app. I've attached 3 files:
 SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
 email as well:

 package com.myorg

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
 import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

 import scala.util.{Failure, Success, Try}

 object Spark3Test {

   val isLocal = false

   implicit val stringEncoder: Encoder[String] = Encoders.STRING
   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

   val START_DATE_INDEX = 21
   val END_DATE_INDEX = 40

   def main(args: Array[String]) {

 val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
 isLocal)
 spark.sparkContext.setLogLevel("WARN")

 readKafkaStream(spark)
   .groupByKey(row => {
 row.substring(START_DATE_INDEX, END_DATE_INDEX)
   })
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
 updateAcrossEvents
   )
   .filter(row => !row.inProgress)
   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
   .writeStream
   .format("kafka")
   .option(
 s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
 "10.29.42.141:9092"
 //"localhost:9092"
   )
   .option("topic", "spark3test")
   .option("checkpointLocation", "/tmp/checkpoint_5")
   .outputMode("update")
   .start()
 manageStreamingQueries(spark)
   }

   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

 val stream = sparkSession.readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
   .option("subscribe", "inputTopic")
   .option("startingOffsets", "latest")
   .option("failOnDataLoss", "false")
   .option("kafkaConsumer.pollTimeoutMs", "12")
   .load()
   .selectExpr("CAST(value AS STRING)")
   .as[String](Encoders.STRING)
 stream
   }

   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
 GroupState[MyState]): MyState = {
 if (!oldState.exists) {
   println(key)
   val state = MyState(key)
   oldState.update(state)
   oldState.setTimeoutDuration("1 minutes")
   oldState.get
 } else {
   if (oldState.hasTimedOut) {
 oldState.get.inProgress = false
 val state = oldState.get
 println("State timed out for key: " + state.dateTime)
 oldState.remove()
 state
   } else {
 val state = oldState.get
 state.count = state.count + 1
 oldState.update(state)
 oldState.setTimeoutDuration("1 minutes")
 oldState.get
   }
 }
   }

   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
 SparkSession = {
 UserGroupInformation.setLoginUser(
   UserGroupInformation.createRemoteUser("hduser")
 )

 val builder = SparkSession
   .builder()
   .appName(applicationName)

 if (isLocal) {
   builder.config("spark.master", "local[2]")
 }

 builder.getOrCreate()
   }

   def manageStreamingQueries(spark: SparkSession): Unit = {

 val sparkQueryListener = new QueryListener()
 spark.streams.addListener(sparkQueryListener)

 val shutdownMarker: String = "/tmp/stop_job"

 val timeoutInMilliSeconds = 6
 while 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread German Schiavon
Hi,

This is the jira  and
regarding the repo, I believe just commit it to your personal repo and that
should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes  wrote:

> Sorry. Can you please tell me where to create the JIRA? Also is there any
> specific Github repository I need to commit code into - OR - just in our
> own? Please let me know. Thanks.
>
> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
> wrote:
>
>> Thanks you, as we've asked could you please create a jira and commit the
>> code into github?
>> It would speed things up a lot.
>>
>> G
>>
>>
>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>> wrote:
>>
>>> Here's a very simple reproducer app. I've attached 3 files:
>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>> email as well:
>>>
>>> package com.myorg
>>>
>>> import org.apache.hadoop.conf.Configuration
>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>> import org.apache.hadoop.security.UserGroupInformation
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>
>>> import scala.util.{Failure, Success, Try}
>>>
>>> object Spark3Test {
>>>
>>>   val isLocal = false
>>>
>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>
>>>   val START_DATE_INDEX = 21
>>>   val END_DATE_INDEX = 40
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>>> isLocal)
>>> spark.sparkContext.setLogLevel("WARN")
>>>
>>> readKafkaStream(spark)
>>>   .groupByKey(row => {
>>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>   })
>>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>   .filter(row => !row.inProgress)
>>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>   .writeStream
>>>   .format("kafka")
>>>   .option(
>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>> "10.29.42.141:9092"
>>> //"localhost:9092"
>>>   )
>>>   .option("topic", "spark3test")
>>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>>   .outputMode("update")
>>>   .start()
>>> manageStreamingQueries(spark)
>>>   }
>>>
>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>
>>> val stream = sparkSession.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>   .option("subscribe", "inputTopic")
>>>   .option("startingOffsets", "latest")
>>>   .option("failOnDataLoss", "false")
>>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>>   .load()
>>>   .selectExpr("CAST(value AS STRING)")
>>>   .as[String](Encoders.STRING)
>>> stream
>>>   }
>>>
>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>>> GroupState[MyState]): MyState = {
>>> if (!oldState.exists) {
>>>   println(key)
>>>   val state = MyState(key)
>>>   oldState.update(state)
>>>   oldState.setTimeoutDuration("1 minutes")
>>>   oldState.get
>>> } else {
>>>   if (oldState.hasTimedOut) {
>>> oldState.get.inProgress = false
>>> val state = oldState.get
>>> println("State timed out for key: " + state.dateTime)
>>> oldState.remove()
>>> state
>>>   } else {
>>> val state = oldState.get
>>> state.count = state.count + 1
>>> oldState.update(state)
>>> oldState.setTimeoutDuration("1 minutes")
>>> oldState.get
>>>   }
>>> }
>>>   }
>>>
>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>>> SparkSession = {
>>> UserGroupInformation.setLoginUser(
>>>   UserGroupInformation.createRemoteUser("hduser")
>>> )
>>>
>>> val builder = SparkSession
>>>   .builder()
>>>   .appName(applicationName)
>>>
>>> if (isLocal) {
>>>   builder.config("spark.master", "local[2]")
>>> }
>>>
>>> builder.getOrCreate()
>>>   }
>>>
>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>
>>> val sparkQueryListener = new QueryListener()
>>> spark.streams.addListener(sparkQueryListener)
>>>
>>> val shutdownMarker: String = "/tmp/stop_job"
>>>
>>> val timeoutInMilliSeconds = 6
>>> while (!spark.streams.active.isEmpty) {
>>>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>>> case Success(result) =>
>>>   if (result) {
>>> println("A streaming query was terminated successfully")
>>> spark.streams.resetTerminated()
>>>   }
>>> case Failure(e) =>
>>>   

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Sorry. Can you please tell me where to create the JIRA? Also is there any
specific Github repository I need to commit code into - OR - just in our
own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
wrote:

> Thanks you, as we've asked could you please create a jira and commit the
> code into github?
> It would speed things up a lot.
>
> G
>
>
> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
> wrote:
>
>> Here's a very simple reproducer app. I've attached 3 files:
>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>> email as well:
>>
>> package com.myorg
>>
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.fs.{FileSystem, Path}
>> import org.apache.hadoop.security.UserGroupInformation
>> import org.apache.kafka.clients.producer.ProducerConfig
>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>
>> import scala.util.{Failure, Success, Try}
>>
>> object Spark3Test {
>>
>>   val isLocal = false
>>
>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>
>>   val START_DATE_INDEX = 21
>>   val END_DATE_INDEX = 40
>>
>>   def main(args: Array[String]) {
>>
>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>> isLocal)
>> spark.sparkContext.setLogLevel("WARN")
>>
>> readKafkaStream(spark)
>>   .groupByKey(row => {
>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>   })
>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>   .filter(row => !row.inProgress)
>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>   .writeStream
>>   .format("kafka")
>>   .option(
>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>> "10.29.42.141:9092"
>> //"localhost:9092"
>>   )
>>   .option("topic", "spark3test")
>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>   .outputMode("update")
>>   .start()
>> manageStreamingQueries(spark)
>>   }
>>
>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>
>> val stream = sparkSession.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>   .option("subscribe", "inputTopic")
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>   .load()
>>   .selectExpr("CAST(value AS STRING)")
>>   .as[String](Encoders.STRING)
>> stream
>>   }
>>
>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>> GroupState[MyState]): MyState = {
>> if (!oldState.exists) {
>>   println(key)
>>   val state = MyState(key)
>>   oldState.update(state)
>>   oldState.setTimeoutDuration("1 minutes")
>>   oldState.get
>> } else {
>>   if (oldState.hasTimedOut) {
>> oldState.get.inProgress = false
>> val state = oldState.get
>> println("State timed out for key: " + state.dateTime)
>> oldState.remove()
>> state
>>   } else {
>> val state = oldState.get
>> state.count = state.count + 1
>> oldState.update(state)
>> oldState.setTimeoutDuration("1 minutes")
>> oldState.get
>>   }
>> }
>>   }
>>
>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>> SparkSession = {
>> UserGroupInformation.setLoginUser(
>>   UserGroupInformation.createRemoteUser("hduser")
>> )
>>
>> val builder = SparkSession
>>   .builder()
>>   .appName(applicationName)
>>
>> if (isLocal) {
>>   builder.config("spark.master", "local[2]")
>> }
>>
>> builder.getOrCreate()
>>   }
>>
>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>
>> val sparkQueryListener = new QueryListener()
>> spark.streams.addListener(sparkQueryListener)
>>
>> val shutdownMarker: String = "/tmp/stop_job"
>>
>> val timeoutInMilliSeconds = 6
>> while (!spark.streams.active.isEmpty) {
>>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>> case Success(result) =>
>>   if (result) {
>> println("A streaming query was terminated successfully")
>> spark.streams.resetTerminated()
>>   }
>> case Failure(e) =>
>>   println("Query failed with message: " + e.getMessage)
>>   e.printStackTrace()
>>   spark.streams.resetTerminated()
>>   }
>>
>>   if (checkMarker(shutdownMarker)) {
>> spark.streams.active.foreach(query => {
>>   println(s"Stopping streaming query: ${query.id}")
>>   query.stop()
>> })
>> spark.stop()
>> 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Gabor Somogyi
Thanks you, as we've asked could you please create a jira and commit the
code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
> isLocal)
> spark.sparkContext.setLogLevel("WARN")
>
> readKafkaStream(spark)
>   .groupByKey(row => {
> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>   })
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>   .filter(row => !row.inProgress)
>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>   .writeStream
>   .format("kafka")
>   .option(
> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
> "10.29.42.141:9092"
> //"localhost:9092"
>   )
>   .option("topic", "spark3test")
>   .option("checkpointLocation", "/tmp/checkpoint_5")
>   .outputMode("update")
>   .start()
> manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
> val stream = sparkSession.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>   .option("subscribe", "inputTopic")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafkaConsumer.pollTimeoutMs", "12")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>   .as[String](Encoders.STRING)
> stream
>   }
>
>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
> GroupState[MyState]): MyState = {
> if (!oldState.exists) {
>   println(key)
>   val state = MyState(key)
>   oldState.update(state)
>   oldState.setTimeoutDuration("1 minutes")
>   oldState.get
> } else {
>   if (oldState.hasTimedOut) {
> oldState.get.inProgress = false
> val state = oldState.get
> println("State timed out for key: " + state.dateTime)
> oldState.remove()
> state
>   } else {
> val state = oldState.get
> state.count = state.count + 1
> oldState.update(state)
> oldState.setTimeoutDuration("1 minutes")
> oldState.get
>   }
> }
>   }
>
>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
> SparkSession = {
> UserGroupInformation.setLoginUser(
>   UserGroupInformation.createRemoteUser("hduser")
> )
>
> val builder = SparkSession
>   .builder()
>   .appName(applicationName)
>
> if (isLocal) {
>   builder.config("spark.master", "local[2]")
> }
>
> builder.getOrCreate()
>   }
>
>   def manageStreamingQueries(spark: SparkSession): Unit = {
>
> val sparkQueryListener = new QueryListener()
> spark.streams.addListener(sparkQueryListener)
>
> val shutdownMarker: String = "/tmp/stop_job"
>
> val timeoutInMilliSeconds = 6
> while (!spark.streams.active.isEmpty) {
>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
> case Success(result) =>
>   if (result) {
> println("A streaming query was terminated successfully")
> spark.streams.resetTerminated()
>   }
> case Failure(e) =>
>   println("Query failed with message: " + e.getMessage)
>   e.printStackTrace()
>   spark.streams.resetTerminated()
>   }
>
>   if (checkMarker(shutdownMarker)) {
> spark.streams.active.foreach(query => {
>   println(s"Stopping streaming query: ${query.id}")
>   query.stop()
> })
> spark.stop()
> removeMarker(shutdownMarker)
>   }
> }
> assert(spark.streams.active.isEmpty)
> spark.streams.removeListener(sparkQueryListener)
>   }
>
>   def checkMarker(markerFile: String): Boolean = {
> val fs = FileSystem.get(new Configuration())
> fs.exists(new Path(markerFile))
>   }
>
>   def removeMarker(markerFile: String): Unit = {
> val fs = FileSystem.get(new Configuration())
>   

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Here's a very simple reproducer app. I've attached 3 files:
SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

  val isLocal = false

  implicit val stringEncoder: Encoder[String] = Encoders.STRING
  implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

  val START_DATE_INDEX = 21
  val END_DATE_INDEX = 40

  def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0
Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
  .groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
  })
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )
  .filter(row => !row.inProgress)
  .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
  .writeStream
  .format("kafka")
  .option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
//"localhost:9092"
  )
  .option("topic", "spark3test")
  .option("checkpointLocation", "/tmp/checkpoint_5")
  .outputMode("update")
  .start()
manageStreamingQueries(spark)
  }

  def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "10.29.42.141:9092")
  .option("subscribe", "inputTopic")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafkaConsumer.pollTimeoutMs", "12")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String](Encoders.STRING)
stream
  }

  def updateAcrossEvents(key: String, inputs: Iterator[String],
oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
  println(key)
  val state = MyState(key)
  oldState.update(state)
  oldState.setTimeoutDuration("1 minutes")
  oldState.get
} else {
  if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
  } else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
  }
}
  }

  def initializeSparkSession(applicationName: String, isLocal:
Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
  UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
  .builder()
  .appName(applicationName)

if (isLocal) {
  builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
  }

  def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 6
while (!spark.streams.active.isEmpty) {
  Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
  if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
  }
case Failure(e) =>
  println("Query failed with message: " + e.getMessage)
  e.printStackTrace()
  spark.streams.resetTerminated()
  }

  if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
  println(s"Stopping streaming query: ${query.id}")
  query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
  }
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
  }

  def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
  }

  def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
  }

}

case class MyState(var dateTime: String, var inProgress: Boolean =
true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

  def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

  def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
if 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-13 Thread Eric Beabes
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi 
wrote:

> Just reached this thread. +1 on to create a simple reproducer app and I
> suggest to create a jira attaching the full driver and executor logs.
> Ping me on the jira and I'll pick this up right away...
>
> Thanks!
>
> G
>
>
> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim 
> wrote:
>
>> Would you mind if I ask for a simple reproducer? Would be nice if you
>> could create a repository in Github and push the code including the build
>> script.
>>
>> Thanks in advance!
>>
>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
>> wrote:
>>
>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>
>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Which exact Spark version did you use? Did you make sure the version
 for Spark and the version for spark-sql-kafka artifact are the same? (I
 asked this because you've said you've used Spark 3.0 but spark-sql-kafka
 dependency pointed to 3.1.0.)

 On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
 wrote:

> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
> streaming sinks does not support Update mode. === Streaming Query ===
> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} 
> Current
> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE 
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
> sinks does not support Update mode. at
> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
> ... 1 more
>
>
> *Please see the attached image for more information.*
>
>
> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski 
> wrote:
>
>> Hi,
>>
>> Can you post the whole message? I'm trying to find what might be
>> causing it. A small reproducible example would be of help too. Thank you.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>> wrote:
>>
>>> Trying to port my Spark 2.4 based (Structured) streaming application
>>> to Spark 3.0. I compiled it using the dependency given below:
>>>
>>> 
>>> org.apache.spark
>>> 
>>> spark-sql-kafka-0-10_${scala.binary.version}
>>> 3.1.0
>>> 
>>>
>>>
>>> Every time I run it under Spark 3.0, I get this message: *Data
>>> source v2 streaming sinks does not support Update mode*
>>>
>>> I am using '*mapGroupsWithState*' so as per this link (
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>> the only supported Output mode is "*Update*".
>>>
>>> My Sink is a Kafka topic so I am using this:
>>>
>>> .writeStream
>>> .format("kafka")
>>>
>>>
>>> What am I missing?
>>>
>>>
>>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Data source v2 streaming sinks does not support Update mode

2021-01-13 Thread Gabor Somogyi
Just reached this thread. +1 on to create a simple reproducer app and I
suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim 
wrote:

> Would you mind if I ask for a simple reproducer? Would be nice if you
> could create a repository in Github and push the code including the build
> script.
>
> Thanks in advance!
>
> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
> wrote:
>
>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>
>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Which exact Spark version did you use? Did you make sure the version for
>>> Spark and the version for spark-sql-kafka artifact are the same? (I asked
>>> this because you've said you've used Spark 3.0 but spark-sql-kafka
>>> dependency pointed to 3.1.0.)
>>>
>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
>>> wrote:
>>>
 org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
 streaming sinks does not support Update mode. === Streaming Query ===
 Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
 Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
 at
 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
 Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
 sinks does not support Update mode. at
 org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
 at
 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
 at
 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
 at 
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
 ... 1 more


 *Please see the attached image for more information.*


 On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski 
 wrote:

> Hi,
>
> Can you post the whole message? I'm trying to find what might be
> causing it. A small reproducible example would be of help too. Thank you.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
> wrote:
>
>> Trying to port my Spark 2.4 based (Structured) streaming application
>> to Spark 3.0. I compiled it using the dependency given below:
>>
>> 
>> org.apache.spark
>> spark-sql-kafka-0-10_${scala.binary.version}
>> 3.1.0
>> 
>>
>>
>> Every time I run it under Spark 3.0, I get this message: *Data
>> source v2 streaming sinks does not support Update mode*
>>
>> I am using '*mapGroupsWithState*' so as per this link (
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>> the only supported Output mode is "*Update*".
>>
>> My Sink is a Kafka topic so I am using this:
>>
>> .writeStream
>> .format("kafka")
>>
>>
>> What am I missing?
>>
>>
>>
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jungtaek Lim
Would you mind if I ask for a simple reproducer? Would be nice if you could
create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
wrote:

> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>
> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Which exact Spark version did you use? Did you make sure the version for
>> Spark and the version for spark-sql-kafka artifact are the same? (I asked
>> this because you've said you've used Spark 3.0 but spark-sql-kafka
>> dependency pointed to 3.1.0.)
>>
>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
>> wrote:
>>
>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
>>> streaming sinks does not support Update mode. === Streaming Query ===
>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
>>> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>> at
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
>>> sinks does not support Update mode. at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>> ... 1 more
>>>
>>>
>>> *Please see the attached image for more information.*
>>>
>>>
>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski  wrote:
>>>
 Hi,

 Can you post the whole message? I'm trying to find what might be
 causing it. A small reproducible example would be of help too. Thank you.

 Pozdrawiam,
 Jacek Laskowski
 
 https://about.me/JacekLaskowski
 "The Internals Of" Online Books 
 Follow me on https://twitter.com/jaceklaskowski

 


 On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
 wrote:

> Trying to port my Spark 2.4 based (Structured) streaming application
> to Spark 3.0. I compiled it using the dependency given below:
>
> 
> org.apache.spark
> spark-sql-kafka-0-10_${scala.binary.version}
> 3.1.0
> 
>
>
> Every time I run it under Spark 3.0, I get this message: *Data source
> v2 streaming sinks does not support Update mode*
>
> I am using '*mapGroupsWithState*' so as per this link (
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
> the only supported Output mode is "*Update*".
>
> My Sink is a Kafka topic so I am using this:
>
> .writeStream
> .format("kafka")
>
>
> What am I missing?
>
>
>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Eric Beabes
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim 
wrote:

> Which exact Spark version did you use? Did you make sure the version for
> Spark and the version for spark-sql-kafka artifact are the same? (I asked
> this because you've said you've used Spark 3.0 but spark-sql-kafka
> dependency pointed to 3.1.0.)
>
> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
> wrote:
>
>> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
>> streaming sinks does not support Update mode. === Streaming Query ===
>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
>> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
>> sinks does not support Update mode. at
>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>> ... 1 more
>>
>>
>> *Please see the attached image for more information.*
>>
>>
>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> Can you post the whole message? I'm trying to find what might be causing
>>> it. A small reproducible example would be of help too. Thank you.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books 
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> 
>>>
>>>
>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>>> wrote:
>>>
 Trying to port my Spark 2.4 based (Structured) streaming application to
 Spark 3.0. I compiled it using the dependency given below:

 
 org.apache.spark
 spark-sql-kafka-0-10_${scala.binary.version}
 3.1.0
 


 Every time I run it under Spark 3.0, I get this message: *Data source
 v2 streaming sinks does not support Update mode*

 I am using '*mapGroupsWithState*' so as per this link (
 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
 the only supported Output mode is "*Update*".

 My Sink is a Kafka topic so I am using this:

 .writeStream
 .format("kafka")


 What am I missing?



>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jungtaek Lim
Which exact Spark version did you use? Did you make sure the version for
Spark and the version for spark-sql-kafka artifact are the same? (I asked
this because you've said you've used Spark 3.0 but spark-sql-kafka
dependency pointed to 3.1.0.)

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
wrote:

> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
> streaming sinks does not support Update mode. === Streaming Query ===
> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
> sinks does not support Update mode. at
> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
> ... 1 more
>
>
> *Please see the attached image for more information.*
>
>
> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Can you post the whole message? I'm trying to find what might be causing
>> it. A small reproducible example would be of help too. Thank you.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>> wrote:
>>
>>> Trying to port my Spark 2.4 based (Structured) streaming application to
>>> Spark 3.0. I compiled it using the dependency given below:
>>>
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_${scala.binary.version}
>>> 3.1.0
>>> 
>>>
>>>
>>> Every time I run it under Spark 3.0, I get this message: *Data source
>>> v2 streaming sinks does not support Update mode*
>>>
>>> I am using '*mapGroupsWithState*' so as per this link (
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>> the only supported Output mode is "*Update*".
>>>
>>> My Sink is a Kafka topic so I am using this:
>>>
>>> .writeStream
>>> .format("kafka")
>>>
>>>
>>> What am I missing?
>>>
>>>
>>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jacek Laskowski
Hi,

Can you post the whole message? I'm trying to find what might be causing
it. A small reproducible example would be of help too. Thank you.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
wrote:

> Trying to port my Spark 2.4 based (Structured) streaming application to
> Spark 3.0. I compiled it using the dependency given below:
>
> 
> org.apache.spark
> spark-sql-kafka-0-10_${scala.binary.version}
> 3.1.0
> 
>
>
> Every time I run it under Spark 3.0, I get this message: *Data source v2
> streaming sinks does not support Update mode*
>
> I am using '*mapGroupsWithState*' so as per this link (
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
> the only supported Output mode is "*Update*".
>
> My Sink is a Kafka topic so I am using this:
>
> .writeStream
> .format("kafka")
>
>
> What am I missing?
>
>
>