Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Confirmed. The cluster Admin said his team installed the latest version
from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes 
wrote:

> >> "Could you please make sure you're not using "3.0.0-preview".
>
> This could be the reason. I will check with our Hadoop cluster
> administrator. It's quite possible that they installed the "Preview" mode.
> Yes, the code works in the Local dev environment.
>
>
> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim 
> wrote:
>
>> I see no issue from running this code in local dev. (changed the scope of
>> Spark artifacts to "compile" of course)
>>
>> Could you please make sure you're not using "3.0.0-preview"? In
>> 3.0.0-preview update mode was restricted (as the error message says) and it
>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
>> .m2 cache may work.
>>
>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> And also include some test data as well. I quickly looked through the
>>> code and the code may require a specific format of the record.
>>>
>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
 Hi,

 This is the jira
  and regarding
 the repo, I believe just commit it to your personal repo and that should be
 it.

 Regards

 On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
 wrote:

> Sorry. Can you please tell me where to create the JIRA? Also is there
> any specific Github repository I need to commit code into - OR - just in
> our own? Please let me know. Thanks.
>
> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com> wrote:
>
>> Thanks you, as we've asked could you please create a jira and commit
>> the code into github?
>> It would speed things up a lot.
>>
>> G
>>
>>
>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>> wrote:
>>
>>> Here's a very simple reproducer app. I've attached 3 files:
>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>> email as well:
>>>
>>> package com.myorg
>>>
>>> import org.apache.hadoop.conf.Configuration
>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>> import org.apache.hadoop.security.UserGroupInformation
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>
>>> import scala.util.{Failure, Success, Try}
>>>
>>> object Spark3Test {
>>>
>>>   val isLocal = false
>>>
>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>
>>>   val START_DATE_INDEX = 21
>>>   val END_DATE_INDEX = 40
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 
>>> Upgrade", isLocal)
>>> spark.sparkContext.setLogLevel("WARN")
>>>
>>> readKafkaStream(spark)
>>>   .groupByKey(row => {
>>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>   })
>>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>   .filter(row => !row.inProgress)
>>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>   .writeStream
>>>   .format("kafka")
>>>   .option(
>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>> "10.29.42.141:9092"
>>> //"localhost:9092"
>>>   )
>>>   .option("topic", "spark3test")
>>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>>   .outputMode("update")
>>>   .start()
>>> manageStreamingQueries(spark)
>>>   }
>>>
>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>
>>> val stream = sparkSession.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>   .option("subscribe", "inputTopic")
>>>   .option("startingOffsets", "latest")
>>>   .option("failOnDataLoss", "false")
>>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>>   .load()
>>>   .selectExpr("CAST(value AS STRING)")
>>>   .as[String](Encoders.STRING)
>>> stream
>>>   }
>>>
>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], 
>>> oldState: GroupState[MyState]): MyState = {
>>> if 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
>> "Could you please make sure you're not using "3.0.0-preview".

This could be the reason. I will check with our Hadoop cluster
administrator. It's quite possible that they installed the "Preview" mode.
Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim 
wrote:

> I see no issue from running this code in local dev. (changed the scope of
> Spark artifacts to "compile" of course)
>
> Could you please make sure you're not using "3.0.0-preview"? In
> 3.0.0-preview update mode was restricted (as the error message says) and it
> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
> .m2 cache may work.
>
> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim 
> wrote:
>
>> And also include some test data as well. I quickly looked through the
>> code and the code may require a specific format of the record.
>>
>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>> gschiavonsp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This is the jira  and
>>> regarding the repo, I believe just commit it to your personal repo and that
>>> should be it.
>>>
>>> Regards
>>>
>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>>> wrote:
>>>
 Sorry. Can you please tell me where to create the JIRA? Also is there
 any specific Github repository I need to commit code into - OR - just in
 our own? Please let me know. Thanks.

 On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> Thanks you, as we've asked could you please create a jira and commit
> the code into github?
> It would speed things up a lot.
>
> G
>
>
> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
> wrote:
>
>> Here's a very simple reproducer app. I've attached 3 files:
>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>> email as well:
>>
>> package com.myorg
>>
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.fs.{FileSystem, Path}
>> import org.apache.hadoop.security.UserGroupInformation
>> import org.apache.kafka.clients.producer.ProducerConfig
>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>
>> import scala.util.{Failure, Success, Try}
>>
>> object Spark3Test {
>>
>>   val isLocal = false
>>
>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>
>>   val START_DATE_INDEX = 21
>>   val END_DATE_INDEX = 40
>>
>>   def main(args: Array[String]) {
>>
>> val spark: SparkSession = initializeSparkSession("Spark 3.0 
>> Upgrade", isLocal)
>> spark.sparkContext.setLogLevel("WARN")
>>
>> readKafkaStream(spark)
>>   .groupByKey(row => {
>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>   })
>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>   .filter(row => !row.inProgress)
>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>   .writeStream
>>   .format("kafka")
>>   .option(
>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>> "10.29.42.141:9092"
>> //"localhost:9092"
>>   )
>>   .option("topic", "spark3test")
>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>   .outputMode("update")
>>   .start()
>> manageStreamingQueries(spark)
>>   }
>>
>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>
>> val stream = sparkSession.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>   .option("subscribe", "inputTopic")
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>   .load()
>>   .selectExpr("CAST(value AS STRING)")
>>   .as[String](Encoders.STRING)
>> stream
>>   }
>>
>>   def updateAcrossEvents(key: String, inputs: Iterator[String], 
>> oldState: GroupState[MyState]): MyState = {
>> if (!oldState.exists) {
>>   println(key)
>>   val state = MyState(key)
>>   oldState.update(state)
>>   oldState.setTimeoutDuration("1 minutes")
>>   oldState.get
>> } else {
>>   if (oldState.hasTimedOut) {
>> oldState.get.inProgress = false
>> val state = oldState.get
>> println("State timed out for key: " + state.dateTime)
>> oldState.remove()
>> state
>>  

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
I see no issue from running this code in local dev. (changed the scope of
Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In
3.0.0-preview update mode was restricted (as the error message says) and it
was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
.m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim 
wrote:

> And also include some test data as well. I quickly looked through the code
> and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon 
> wrote:
>
>> Hi,
>>
>> This is the jira  and
>> regarding the repo, I believe just commit it to your personal repo and that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>> any specific Github repository I need to commit code into - OR - just in
>>> our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
>>> wrote:
>>>
 Thanks you, as we've asked could you please create a jira and commit
 the code into github?
 It would speed things up a lot.

 G


 On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
 wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
> isLocal)
> spark.sparkContext.setLogLevel("WARN")
>
> readKafkaStream(spark)
>   .groupByKey(row => {
> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>   })
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>   .filter(row => !row.inProgress)
>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>   .writeStream
>   .format("kafka")
>   .option(
> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
> "10.29.42.141:9092"
> //"localhost:9092"
>   )
>   .option("topic", "spark3test")
>   .option("checkpointLocation", "/tmp/checkpoint_5")
>   .outputMode("update")
>   .start()
> manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
> val stream = sparkSession.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>   .option("subscribe", "inputTopic")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafkaConsumer.pollTimeoutMs", "12")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>   .as[String](Encoders.STRING)
> stream
>   }
>
>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
> GroupState[MyState]): MyState = {
> if (!oldState.exists) {
>   println(key)
>   val state = MyState(key)
>   oldState.update(state)
>   oldState.setTimeoutDuration("1 minutes")
>   oldState.get
> } else {
>   if (oldState.hasTimedOut) {
> oldState.get.inProgress = false
> val state = oldState.get
> println("State timed out for key: " + state.dateTime)
> oldState.remove()
> state
>   } else {
> val state = oldState.get
> state.count = state.count + 1
> oldState.update(state)
> oldState.setTimeoutDuration("1 minutes")
> oldState.get
>   }
> }
>   }
>
>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
> SparkSession = {
> UserGroupInformation.setLoginUser(
>   UserGroupInformation.createRemoteUser("hduser")
> )
>
> val builder 

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
And also include some test data as well. I quickly looked through the code
and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon 
wrote:

> Hi,
>
> This is the jira  and
> regarding the repo, I believe just commit it to your personal repo and that
> should be it.
>
> Regards
>
> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
> wrote:
>
>> Sorry. Can you please tell me where to create the JIRA? Also is there any
>> specific Github repository I need to commit code into - OR - just in our
>> own? Please let me know. Thanks.
>>
>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
>> wrote:
>>
>>> Thanks you, as we've asked could you please create a jira and commit the
>>> code into github?
>>> It would speed things up a lot.
>>>
>>> G
>>>
>>>
>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>>> wrote:
>>>
 Here's a very simple reproducer app. I've attached 3 files:
 SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
 email as well:

 package com.myorg

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
 import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

 import scala.util.{Failure, Success, Try}

 object Spark3Test {

   val isLocal = false

   implicit val stringEncoder: Encoder[String] = Encoders.STRING
   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

   val START_DATE_INDEX = 21
   val END_DATE_INDEX = 40

   def main(args: Array[String]) {

 val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
 isLocal)
 spark.sparkContext.setLogLevel("WARN")

 readKafkaStream(spark)
   .groupByKey(row => {
 row.substring(START_DATE_INDEX, END_DATE_INDEX)
   })
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
 updateAcrossEvents
   )
   .filter(row => !row.inProgress)
   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
   .writeStream
   .format("kafka")
   .option(
 s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
 "10.29.42.141:9092"
 //"localhost:9092"
   )
   .option("topic", "spark3test")
   .option("checkpointLocation", "/tmp/checkpoint_5")
   .outputMode("update")
   .start()
 manageStreamingQueries(spark)
   }

   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

 val stream = sparkSession.readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
   .option("subscribe", "inputTopic")
   .option("startingOffsets", "latest")
   .option("failOnDataLoss", "false")
   .option("kafkaConsumer.pollTimeoutMs", "12")
   .load()
   .selectExpr("CAST(value AS STRING)")
   .as[String](Encoders.STRING)
 stream
   }

   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
 GroupState[MyState]): MyState = {
 if (!oldState.exists) {
   println(key)
   val state = MyState(key)
   oldState.update(state)
   oldState.setTimeoutDuration("1 minutes")
   oldState.get
 } else {
   if (oldState.hasTimedOut) {
 oldState.get.inProgress = false
 val state = oldState.get
 println("State timed out for key: " + state.dateTime)
 oldState.remove()
 state
   } else {
 val state = oldState.get
 state.count = state.count + 1
 oldState.update(state)
 oldState.setTimeoutDuration("1 minutes")
 oldState.get
   }
 }
   }

   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
 SparkSession = {
 UserGroupInformation.setLoginUser(
   UserGroupInformation.createRemoteUser("hduser")
 )

 val builder = SparkSession
   .builder()
   .appName(applicationName)

 if (isLocal) {
   builder.config("spark.master", "local[2]")
 }

 builder.getOrCreate()
   }

   def manageStreamingQueries(spark: SparkSession): Unit = {

 val sparkQueryListener = new QueryListener()
 spark.streams.addListener(sparkQueryListener)

 val shutdownMarker: String = "/tmp/stop_job"

 val timeoutInMilliSeconds = 6
 while 

[Datasource API V2] Creating datasource - no step for final cleanup on read

2021-01-18 Thread Alex Rehnby
Hello,

Currently working on creating a custom datasource using the Spark
Datasource API V2. On read, our datasource uses some temporary files in a
distributed store which we'd like to run some cleanup step on once the
entire operation is done. However, there does not seem to be anything
called in the API for an entire read being done, only the close() function
on individual PartitionReaders.

What I was looking for would be the equivalent to the commit() and abort()
functions in BatchWrite, but for the Scan or Batch class. I'm wondering if
there's any good way to achieve running something at the end of the read
operation using the current API? If not, I would ask if this might be a
useful addition, or if there are design reasons for not including such a
step.

Thanks,
Alex


RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
There you go:


@udf(returnType=ArrayType(StringType()))
def reader_udf(filename: str) -> List[str]:
with open(filename, "r") as f:
return f.read().split('\n')


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select(explode('content')).writeStream.queryName('test').format('console')\
.option('truncate', False).start()
q.awaitTermination()


From: Amit Joshi 
Sent: Monday, 18 January 2021 20:22
To: Boris Litvak 
Cc: spark-user 
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

Thanks for your code block.
I understood what you are trying to achieve in the code.

But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be done 
on it.

Regards
Amit
On Monday, January 18, 2021, Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
HI Amit,

I was thinking along the lines of (python):


@udf(returnType=StringType())
def reader_udf(filename: str) -> str:
with open(filename, "r") as f:
return f.read()


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()

Now each row contains the contents of the files, provided they are not large 
you can foreach() over the df/rdd and do whatever you want with it, such as 
json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat 
DF, ala 
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala

Note that unless I am missing something you cannot access spark session from 
foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.

Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak mailto:boris.lit...@skf.com>>
Cc: spark-user mailto:user@spark.apache.org>>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 15:04
To: spark-user mailto:user@spark.apache.org>>
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Brian Wylie
Coming in late.. but if I understand correctly, you can simply use the fact
that spark.read (or readStream) will also accept a directory argument. If
you provide a directory spark will automagically pull in all the files in
that directory.

"""Reading in multiple files example"""
spark = 
SparkSession.builder.master('local[*]').appName('spark_streaming').getOrCreate()

# Schema for incoming data
json_schema = StructType([StructField("username", StringType(), True),
  StructField("name", StringType(), True),
  StructField("sex", StringType(), True),
  StructField("address", StringType(), True),
  StructField("mail", StringType(), True),
  StructField("birthdate", DateType(), True),
  StructField("work", StringType(), True),
  StructField("salary", IntegerType(), True),
  StructField("timestamp", TimestampType(), True)])

# Read in a bunch of data files (files are in JSON per line
format)data_directory_path = './data/my_directory'


# Create a Spark DF with a bunch of files
spark_df = spark.read.schema(json_schema).json(data_directory_path)




On Mon, Jan 18, 2021 at 11:22 AM Amit Joshi 
wrote:

> Hi Boris,
>
> Thanks for your code block.
> I understood what you are trying to achieve in the code.
>
> But content in the file are json records seperated by new line.
> And we have to make the dataframe out of it, as some processing has to be
> done on it.
>
> Regards
> Amit
> On Monday, January 18, 2021, Boris Litvak  wrote:
>
>> HI Amit,
>>
>>
>>
>> I was thinking along the lines of (python):
>>
>>
>>
>>
>> @udf(returnType=StringType())
>> def reader_udf(filename: str) -> str:
>> with open(filename, "r") as f:
>> return f.read()
>>
>>
>> def run_locally():
>> with utils.build_spark_session("Local", local=True) as spark:
>> df = spark.readStream.csv(r'testdata', schema
>> =StructType([StructField('filename', StringType(), True)]))
>> df = df.withColumn('content', reader_udf(col('filename')))
>> q = df.select('content').writeStream.queryName('test').format(
>> 'console').start()
>> q.awaitTermination()
>>
>>
>>
>> Now each row contains the contents of the files, provided they are not
>> large you can foreach() over the df/rdd and do whatever you want with it,
>> such as json.loads()/etc.
>>
>> If you know the shema of the jsons, you can later explode() them into a
>> flat DF, ala
>> https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
>>
>>
>>
>> Note that unless I am missing something you cannot access spark session
>> from foreach as code is not running on the driver.
>>
>> Please say if it makes sense or did I miss anything.
>>
>>
>>
>> Boris
>>
>>
>>
>> *From:* Amit Joshi 
>> *Sent:* Monday, 18 January 2021 17:10
>> *To:* Boris Litvak 
>> *Cc:* spark-user 
>> *Subject:* Re: [Spark Structured Streaming] Processing the data path
>> coming from kafka.
>>
>>
>>
>> Hi Boris,
>>
>>
>>
>> I need to do processing on the data present in the path.
>>
>> That is the reason I am trying to make the dataframe.
>>
>>
>>
>> Can you please provide the example of your solution?
>>
>>
>>
>> Regards
>>
>> Amit
>>
>>
>>
>> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
>> wrote:
>>
>> Hi Amit,
>>
>>
>>
>> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
>> that reads the paths?
>>
>> Also, do you really have to read the json into an additional dataframe?
>>
>>
>>
>> Thanks, Boris
>>
>>
>>
>> *From:* Amit Joshi 
>> *Sent:* Monday, 18 January 2021 15:04
>> *To:* spark-user 
>> *Subject:* [Spark Structured Streaming] Processing the data path coming
>> from kafka.
>>
>>
>>
>> Hi ,
>>
>>
>>
>> I have a use case where the file path of the json records stored in s3
>> are coming as a kafka
>>
>> message in kafka. I have to process the data using spark structured
>> streaming.
>>
>>
>>
>> The design which I thought is as follows:
>>
>> 1. In kafka Spark structures streaming, read the message containing the
>> data path.
>>
>> 2. Collect the message record in driver. (Messages are small in sizes)
>>
>> 3. Create the dataframe from the datalocation.
>>
>>
>>
>> *kafkaDf*.select(*$"value"*.cast(StringType))
>>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>>
>> //rough code
>>
>> //collec to driver
>>
>> *val *records = batchDf.collect()
>>
>> //create dataframe and process
>> records foreach((rec: Row) =>{
>>   *println*(*"records:##"*,rec.toString())
>>   val path = rec.getAs[String](*"data_path"*)
>>
>>   val dfToProcess =spark.read.json(path)
>>
>>   
>>
>> })
>>
>> }
>>
>> I would like to know the views, if this approach is fine? Specifically if 
>> there is some problem with
>>
>> with creating the dataframe after calling collect.
>>
>> If there is any better approach, please let know the 

Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

Thanks for your code block.
I understood what you are trying to achieve in the code.

But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be
done on it.

Regards
Amit
On Monday, January 18, 2021, Boris Litvak  wrote:

> HI Amit,
>
>
>
> I was thinking along the lines of (python):
>
>
>
>
> @udf(returnType=StringType())
> def reader_udf(filename: str) -> str:
> with open(filename, "r") as f:
> return f.read()
>
>
> def run_locally():
> with utils.build_spark_session("Local", local=True) as spark:
> df = spark.readStream.csv(r'testdata', schema=StructType([
> StructField('filename', StringType(), True)]))
> df = df.withColumn('content', reader_udf(col('filename')))
> q = df.select('content').writeStream.queryName('test').format(
> 'console').start()
> q.awaitTermination()
>
>
>
> Now each row contains the contents of the files, provided they are not
> large you can foreach() over the df/rdd and do whatever you want with it,
> such as json.loads()/etc.
>
> If you know the shema of the jsons, you can later explode() them into a
> flat DF, ala https://stackoverflow.com/questions/38243717/spark-
> explode-nested-json-with-array-in-scala
>
>
>
> Note that unless I am missing something you cannot access spark session
> from foreach as code is not running on the driver.
>
> Please say if it makes sense or did I miss anything.
>
>
>
> Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 17:10
> *To:* Boris Litvak 
> *Cc:* spark-user 
> *Subject:* Re: [Spark Structured Streaming] Processing the data path
> coming from kafka.
>
>
>
> Hi Boris,
>
>
>
> I need to do processing on the data present in the path.
>
> That is the reason I am trying to make the dataframe.
>
>
>
> Can you please provide the example of your solution?
>
>
>
> Regards
>
> Amit
>
>
>
> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:
>
> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
HI Amit,

I was thinking along the lines of (python):


@udf(returnType=StringType())
def reader_udf(filename: str) -> str:
with open(filename, "r") as f:
return f.read()


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()

Now each row contains the contents of the files, provided they are not large 
you can foreach() over the df/rdd and do whatever you want with it, such as 
json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat 
DF, ala 
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala

Note that unless I am missing something you cannot access spark session from 
foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.

Boris

From: Amit Joshi 
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak 
Cc: spark-user 
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 15:04
To: spark-user mailto:user@spark.apache.org>>
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread German Schiavon
Hi,

This is the jira  and
regarding the repo, I believe just commit it to your personal repo and that
should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes  wrote:

> Sorry. Can you please tell me where to create the JIRA? Also is there any
> specific Github repository I need to commit code into - OR - just in our
> own? Please let me know. Thanks.
>
> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
> wrote:
>
>> Thanks you, as we've asked could you please create a jira and commit the
>> code into github?
>> It would speed things up a lot.
>>
>> G
>>
>>
>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>> wrote:
>>
>>> Here's a very simple reproducer app. I've attached 3 files:
>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>> email as well:
>>>
>>> package com.myorg
>>>
>>> import org.apache.hadoop.conf.Configuration
>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>> import org.apache.hadoop.security.UserGroupInformation
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>
>>> import scala.util.{Failure, Success, Try}
>>>
>>> object Spark3Test {
>>>
>>>   val isLocal = false
>>>
>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>
>>>   val START_DATE_INDEX = 21
>>>   val END_DATE_INDEX = 40
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>>> isLocal)
>>> spark.sparkContext.setLogLevel("WARN")
>>>
>>> readKafkaStream(spark)
>>>   .groupByKey(row => {
>>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>   })
>>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>   .filter(row => !row.inProgress)
>>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>   .writeStream
>>>   .format("kafka")
>>>   .option(
>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>> "10.29.42.141:9092"
>>> //"localhost:9092"
>>>   )
>>>   .option("topic", "spark3test")
>>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>>   .outputMode("update")
>>>   .start()
>>> manageStreamingQueries(spark)
>>>   }
>>>
>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>
>>> val stream = sparkSession.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>   .option("subscribe", "inputTopic")
>>>   .option("startingOffsets", "latest")
>>>   .option("failOnDataLoss", "false")
>>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>>   .load()
>>>   .selectExpr("CAST(value AS STRING)")
>>>   .as[String](Encoders.STRING)
>>> stream
>>>   }
>>>
>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>>> GroupState[MyState]): MyState = {
>>> if (!oldState.exists) {
>>>   println(key)
>>>   val state = MyState(key)
>>>   oldState.update(state)
>>>   oldState.setTimeoutDuration("1 minutes")
>>>   oldState.get
>>> } else {
>>>   if (oldState.hasTimedOut) {
>>> oldState.get.inProgress = false
>>> val state = oldState.get
>>> println("State timed out for key: " + state.dateTime)
>>> oldState.remove()
>>> state
>>>   } else {
>>> val state = oldState.get
>>> state.count = state.count + 1
>>> oldState.update(state)
>>> oldState.setTimeoutDuration("1 minutes")
>>> oldState.get
>>>   }
>>> }
>>>   }
>>>
>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>>> SparkSession = {
>>> UserGroupInformation.setLoginUser(
>>>   UserGroupInformation.createRemoteUser("hduser")
>>> )
>>>
>>> val builder = SparkSession
>>>   .builder()
>>>   .appName(applicationName)
>>>
>>> if (isLocal) {
>>>   builder.config("spark.master", "local[2]")
>>> }
>>>
>>> builder.getOrCreate()
>>>   }
>>>
>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>
>>> val sparkQueryListener = new QueryListener()
>>> spark.streams.addListener(sparkQueryListener)
>>>
>>> val shutdownMarker: String = "/tmp/stop_job"
>>>
>>> val timeoutInMilliSeconds = 6
>>> while (!spark.streams.active.isEmpty) {
>>>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>>> case Success(result) =>
>>>   if (result) {
>>> println("A streaming query was terminated successfully")
>>> spark.streams.resetTerminated()
>>>   }
>>> case Failure(e) =>
>>>   

Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:

> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Sorry. Can you please tell me where to create the JIRA? Also is there any
specific Github repository I need to commit code into - OR - just in our
own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
wrote:

> Thanks you, as we've asked could you please create a jira and commit the
> code into github?
> It would speed things up a lot.
>
> G
>
>
> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
> wrote:
>
>> Here's a very simple reproducer app. I've attached 3 files:
>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>> email as well:
>>
>> package com.myorg
>>
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.fs.{FileSystem, Path}
>> import org.apache.hadoop.security.UserGroupInformation
>> import org.apache.kafka.clients.producer.ProducerConfig
>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>
>> import scala.util.{Failure, Success, Try}
>>
>> object Spark3Test {
>>
>>   val isLocal = false
>>
>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>
>>   val START_DATE_INDEX = 21
>>   val END_DATE_INDEX = 40
>>
>>   def main(args: Array[String]) {
>>
>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>> isLocal)
>> spark.sparkContext.setLogLevel("WARN")
>>
>> readKafkaStream(spark)
>>   .groupByKey(row => {
>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>   })
>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>   .filter(row => !row.inProgress)
>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>   .writeStream
>>   .format("kafka")
>>   .option(
>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>> "10.29.42.141:9092"
>> //"localhost:9092"
>>   )
>>   .option("topic", "spark3test")
>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>   .outputMode("update")
>>   .start()
>> manageStreamingQueries(spark)
>>   }
>>
>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>
>> val stream = sparkSession.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>   .option("subscribe", "inputTopic")
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafkaConsumer.pollTimeoutMs", "12")
>>   .load()
>>   .selectExpr("CAST(value AS STRING)")
>>   .as[String](Encoders.STRING)
>> stream
>>   }
>>
>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
>> GroupState[MyState]): MyState = {
>> if (!oldState.exists) {
>>   println(key)
>>   val state = MyState(key)
>>   oldState.update(state)
>>   oldState.setTimeoutDuration("1 minutes")
>>   oldState.get
>> } else {
>>   if (oldState.hasTimedOut) {
>> oldState.get.inProgress = false
>> val state = oldState.get
>> println("State timed out for key: " + state.dateTime)
>> oldState.remove()
>> state
>>   } else {
>> val state = oldState.get
>> state.count = state.count + 1
>> oldState.update(state)
>> oldState.setTimeoutDuration("1 minutes")
>> oldState.get
>>   }
>> }
>>   }
>>
>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
>> SparkSession = {
>> UserGroupInformation.setLoginUser(
>>   UserGroupInformation.createRemoteUser("hduser")
>> )
>>
>> val builder = SparkSession
>>   .builder()
>>   .appName(applicationName)
>>
>> if (isLocal) {
>>   builder.config("spark.master", "local[2]")
>> }
>>
>> builder.getOrCreate()
>>   }
>>
>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>
>> val sparkQueryListener = new QueryListener()
>> spark.streams.addListener(sparkQueryListener)
>>
>> val shutdownMarker: String = "/tmp/stop_job"
>>
>> val timeoutInMilliSeconds = 6
>> while (!spark.streams.active.isEmpty) {
>>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>> case Success(result) =>
>>   if (result) {
>> println("A streaming query was terminated successfully")
>> spark.streams.resetTerminated()
>>   }
>> case Failure(e) =>
>>   println("Query failed with message: " + e.getMessage)
>>   e.printStackTrace()
>>   spark.streams.resetTerminated()
>>   }
>>
>>   if (checkMarker(shutdownMarker)) {
>> spark.streams.active.foreach(query => {
>>   println(s"Stopping streaming query: ${query.id}")
>>   query.stop()
>> })
>> spark.stop()
>> 

Re: Correctness bug on Shuffle+Repartition scenario

2021-01-18 Thread 王长春
Hi Shiao-An Yuan
I also found this correctness problem in my production environment.
My spark version is 2.3.1。 I thought it was because Spark-23243 before .
But you said You also have this problem in your environment
, and your version is 2.4.4 which had solved spark-23243. So Maybe this problem 
is not because SPARK-23243.
 
As you said ,if it was caused by ‘first’ before ‘repartition’, then how to 
solve this problem fundamentally. And is there any workaround?


> 2021年1月18日 上午10:35,Shiao-An Yuan  写道:
> 
> Hi, 
> I am using Spark 2.4.4 standalone mode.
> 
> On Mon, Jan 18, 2021 at 4:26 AM Sean Owen  > wrote:
> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
> 
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan  > wrote:
> Hi folks,
> 
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) 
> environment.
> 
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
> 
> // create a Dataset with the cardinality of the second element equals 5.
> val ds = spark.range(0, 10, 1, 130).map(i => 
> (murmur3.hashLong(i).asInt(), i/2))
> 
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 
> 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we 
> always got fewer number
> ```
> 
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning 
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the 
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different 
> distribution and cause duplications and loss.
> 
> Thanks,
> Shiao-An Yuan
> 
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan  > wrote:
> Hi folks,
> 
> We recently identified a data correctness issue in our pipeline.
> 
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
> 
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
> 
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
> 
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]   
> 
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
> 
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))// 
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
> 
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
> 
> The issue we have is that some data were duplicated or lost, and the amount of
> duplicated and loss data are similar.
> 
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry) 
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and located 
> in
> both batches (one in the first batch; and one in the second batch).
> 
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and they
> being preempted very frequently.
> 
> The pipeline is running in a single long-running process with multi-threads,
> each snapshot represent an "hour" of data, and we do the "read-reduce-write" 
> operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
> 
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` the 
> issue
> was gone, but I believe there is still a correctness bug that hasn't been 
> reported yet.
> 
> We have tried to reproduce this bug on a smaller scale but haven't succeeded 
> yet. I
> have 

RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi 
Sent: Monday, 18 January 2021 15:04
To: spark-user 
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Gabor Somogyi
Thanks you, as we've asked could you please create a jira and commit the
code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
> isLocal)
> spark.sparkContext.setLogLevel("WARN")
>
> readKafkaStream(spark)
>   .groupByKey(row => {
> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>   })
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>   .filter(row => !row.inProgress)
>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>   .writeStream
>   .format("kafka")
>   .option(
> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
> "10.29.42.141:9092"
> //"localhost:9092"
>   )
>   .option("topic", "spark3test")
>   .option("checkpointLocation", "/tmp/checkpoint_5")
>   .outputMode("update")
>   .start()
> manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
> val stream = sparkSession.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>   .option("subscribe", "inputTopic")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafkaConsumer.pollTimeoutMs", "12")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>   .as[String](Encoders.STRING)
> stream
>   }
>
>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: 
> GroupState[MyState]): MyState = {
> if (!oldState.exists) {
>   println(key)
>   val state = MyState(key)
>   oldState.update(state)
>   oldState.setTimeoutDuration("1 minutes")
>   oldState.get
> } else {
>   if (oldState.hasTimedOut) {
> oldState.get.inProgress = false
> val state = oldState.get
> println("State timed out for key: " + state.dateTime)
> oldState.remove()
> state
>   } else {
> val state = oldState.get
> state.count = state.count + 1
> oldState.update(state)
> oldState.setTimeoutDuration("1 minutes")
> oldState.get
>   }
> }
>   }
>
>   def initializeSparkSession(applicationName: String, isLocal: Boolean): 
> SparkSession = {
> UserGroupInformation.setLoginUser(
>   UserGroupInformation.createRemoteUser("hduser")
> )
>
> val builder = SparkSession
>   .builder()
>   .appName(applicationName)
>
> if (isLocal) {
>   builder.config("spark.master", "local[2]")
> }
>
> builder.getOrCreate()
>   }
>
>   def manageStreamingQueries(spark: SparkSession): Unit = {
>
> val sparkQueryListener = new QueryListener()
> spark.streams.addListener(sparkQueryListener)
>
> val shutdownMarker: String = "/tmp/stop_job"
>
> val timeoutInMilliSeconds = 6
> while (!spark.streams.active.isEmpty) {
>   Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
> case Success(result) =>
>   if (result) {
> println("A streaming query was terminated successfully")
> spark.streams.resetTerminated()
>   }
> case Failure(e) =>
>   println("Query failed with message: " + e.getMessage)
>   e.printStackTrace()
>   spark.streams.resetTerminated()
>   }
>
>   if (checkMarker(shutdownMarker)) {
> spark.streams.active.foreach(query => {
>   println(s"Stopping streaming query: ${query.id}")
>   query.stop()
> })
> spark.stop()
> removeMarker(shutdownMarker)
>   }
> }
> assert(spark.streams.active.isEmpty)
> spark.streams.removeListener(sparkQueryListener)
>   }
>
>   def checkMarker(markerFile: String): Boolean = {
> val fs = FileSystem.get(new Configuration())
> fs.exists(new Path(markerFile))
>   }
>
>   def removeMarker(markerFile: String): Unit = {
> val fs = FileSystem.get(new Configuration())
>   

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Here's a very simple reproducer app. I've attached 3 files:
SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

  val isLocal = false

  implicit val stringEncoder: Encoder[String] = Encoders.STRING
  implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

  val START_DATE_INDEX = 21
  val END_DATE_INDEX = 40

  def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0
Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
  .groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
  })
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )
  .filter(row => !row.inProgress)
  .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
  .writeStream
  .format("kafka")
  .option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
//"localhost:9092"
  )
  .option("topic", "spark3test")
  .option("checkpointLocation", "/tmp/checkpoint_5")
  .outputMode("update")
  .start()
manageStreamingQueries(spark)
  }

  def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "10.29.42.141:9092")
  .option("subscribe", "inputTopic")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafkaConsumer.pollTimeoutMs", "12")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String](Encoders.STRING)
stream
  }

  def updateAcrossEvents(key: String, inputs: Iterator[String],
oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
  println(key)
  val state = MyState(key)
  oldState.update(state)
  oldState.setTimeoutDuration("1 minutes")
  oldState.get
} else {
  if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
  } else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
  }
}
  }

  def initializeSparkSession(applicationName: String, isLocal:
Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
  UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
  .builder()
  .appName(applicationName)

if (isLocal) {
  builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
  }

  def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 6
while (!spark.streams.active.isEmpty) {
  Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
  if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
  }
case Failure(e) =>
  println("Query failed with message: " + e.getMessage)
  e.printStackTrace()
  spark.streams.resetTerminated()
  }

  if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
  println(s"Stopping streaming query: ${query.id}")
  query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
  }
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
  }

  def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
  }

  def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
  }

}

case class MyState(var dateTime: String, var inProgress: Boolean =
true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

  def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

  def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
if 

[Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi ,

I have a use case where the file path of the json records stored in s3 are
coming as a kafka
message in kafka. I have to process the data using spark structured
streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the
data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.

kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically
if there is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.


Regards

Amit Joshi


Re: [SparkSQL] Full Join Return Null Value For Funtion-Based Column

2021-01-18 Thread 刘 欢
Sorry, I know the reason. closed

发件人: 刘 欢 
日期: 2021年1月18日 星期一 下午1:39
收件人: "user@spark.apache.org" 
主题: [SparkSQL] Full Join Return Null Value For Funtion-Based Column

Hi All:
Here I got two tables:

Table A
name
num
tom
2
jerry
3
jerry
4
null
null






Table B
name
score
tom
12
jerry
10
jerry
8
null
null






When i use spark.sql() to get result from A and B with sql :


select
  a.name as aName,
  a.date,
  b.name as bName
from
(
select
  name,
  date_format(now(),'-MM-dd') AS date
from
  A
group by
  name
) a
FULL JOIN
(
select
  name
from
  B
group by
  name
) b
ON a.name = b.name

I got results contain ALL NULL VALUE ROW like:

aName
date
bName
null
null
null
…
…
…

Can anyone explains why all null value row appears?