Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Jungtaek Lim
Hi,

You need to add the prefix "kafka." for the configurations which should be
propagated to the Kafka. Others will be used in Spark data source
itself. (Kafka connector in this case)

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, May 26, 2020 at 6:42 AM Something Something <
mailinglist...@gmail.com> wrote:

> I keep getting this error message:
>
>
> *The message is 1169350 bytes when serialized which is larger than the
> maximum request size you have configured with the max.request.size
> configuration.*
>
>
>
> As indicated in other posts, I am trying to set the “max.request.size”
> configuration in the Producer as follows:
>
>
> -
>
> .writeStream
>
> .format(*"kafka"*)
>
> .option(
>
>   *"kafka.bootstrap.servers"*,
>
>   conig.outputBootstrapServer
>
> )
>
> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)
>
> -
>
>
>
> But this is not working. Am I setting this correctly? Is there a different
> way to set this property under Spark Structured Streaming?
>
>
> Please help. Thanks.
>
>
>


Re: PySpark .collect() output to Scala Array[Row]

2020-05-25 Thread Wim Van Leuven
Looking at the stack trace, your data from Spark gets serialized to an
ArrayList (of something) whereas in your scala code you are using an Array
of Rows. So, the types don't lign up. That's the exception you are seeing:
the JVM searches for a signature that simply does not exist.

Try to turn the Array into a java.util.ArrayList?
-w

On Tue, 26 May 2020 at 03:04, Nick Ruest  wrote:

> Hi,
>
> I've hit a wall with trying to implement a couple of Scala methods in a
> Python version of our project. I've implemented a number of these
> already, but I'm getting hung up with this one.
>
> My Python function looks like this:
>
> def Write_Graphml(data, graphml_path, sc):
> return sc.getOrCreate()._jvm.io
> .archivesunleashed.app.WriteGraphML(data,
> graphml_path).apply
>
>
> Where data is a DataFrame that has been collected; data.collect().
>
> On the Scala side is it basically:
>
> object WriteGraphML {
>   apply(data: Array[Row], graphmlPath: String): Boolean = {
> ...
> massages an Array[Row] into GraphML
> ...
> True
> }
>
> When I try to use it in PySpark, I end up getting this error message:
>
> Py4JError: An error occurred while calling
> None.io.archivesunleashed.app.WriteGraphML. Trace:
> py4j.Py4JException: Constructor
> io.archivesunleashed.app.WriteGraphML([class java.util.ArrayList, class
> java.lang.String]) does not exist
> at
> py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
> at
> py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
> at py4j.Gateway.invoke(Gateway.java:237)
> at
>
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
> I originally dug into what the error message stated, and tried a variety
> of tweaks such as:
>
> sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML.apply(data,
> graphml_path)
>
> And, went as far as trying get_attr, and calling the "WriteGraphML$" and
> few other varieties with that method.
>
> All the results produced the same variety of error message above; that
> the Constructor or method does not exist.
>
> I came across this[1] Based on lots of Googling and Stack Overflow
> searches, and it has me thinking that the problem is because of how Py4J
> is passing off the Python List (data) to the JVM, and then passing it to
> Scala. It's ending up as an ArrayList instead of an Array[Row].
>
> Do I need to tweak data before it is passed to Write_Graphml? Or am I
> doing something else wrong here.
>
> I had originally posted a version of this message to the dev list, and
> Sean Owen suggested WriteGraphML should be a implemented as a class, not
> an object. Is that the right path? I have a number of other Scala
> functions implemented in the PySpark side of our project that are
> objects, and everything works fine.
>
> ...and is there a best practices guide or documentation for implementing
> Scala functions in PySpark? I've found a number of blog posts that have
> been helpful.
>
> Thanks in advance for any help!
>
> cheers!
>
> -nruest
>
> [1]
> https://stackoverflow.com/questions/61928886/pyspark-list-to-scala-sequence
>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Srinivas V
Hello,
Even for me it comes as 0 when I print in OnQueryProgress. I use
LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are
coming there.

On Tue, May 26, 2020 at 3:10 AM Something Something <
mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>> atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> 
>> From: Something Something 
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality is
>> supported and tested? I've spent a lot of time on this but can't get it to
>> work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in the
>> updateAcrossEvents method but they are always 0 when I try to print them in
>> my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>>
>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>> StreamingListener which writes values of the accumulators in
>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>> ZERO!
>>
>> When I added log statements in the updateAcrossEvents, I could see that
>> these accumulators are getting incremented as expected.
>>
>> This only happens when I run in the 'Cluster' mode. In Local mode it
>> works fine which implies that the Accumulators are not getting distributed
>> correctly - or something like that!
>>
>> Note: I've seen quite a few answers on the Web that tell me to perform an
>> "Action". That's not a solution here. This is a 'Stateful Structured
>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>
>>
>>
>>


PySpark .collect() output to Scala Array[Row]

2020-05-25 Thread Nick Ruest
Hi,

I've hit a wall with trying to implement a couple of Scala methods in a
Python version of our project. I've implemented a number of these
already, but I'm getting hung up with this one.

My Python function looks like this:

def Write_Graphml(data, graphml_path, sc):
return sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML(data,
graphml_path).apply


Where data is a DataFrame that has been collected; data.collect().

On the Scala side is it basically:

object WriteGraphML {
  apply(data: Array[Row], graphmlPath: String): Boolean = {
...
massages an Array[Row] into GraphML
...
True
}

When I try to use it in PySpark, I end up getting this error message:

Py4JError: An error occurred while calling
None.io.archivesunleashed.app.WriteGraphML. Trace:
py4j.Py4JException: Constructor
io.archivesunleashed.app.WriteGraphML([class java.util.ArrayList, class
java.lang.String]) does not exist
at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
at py4j.Gateway.invoke(Gateway.java:237)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


I originally dug into what the error message stated, and tried a variety
of tweaks such as:

sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML.apply(data,
graphml_path)

And, went as far as trying get_attr, and calling the "WriteGraphML$" and
few other varieties with that method.

All the results produced the same variety of error message above; that
the Constructor or method does not exist.

I came across this[1] Based on lots of Googling and Stack Overflow
searches, and it has me thinking that the problem is because of how Py4J
is passing off the Python List (data) to the JVM, and then passing it to
Scala. It's ending up as an ArrayList instead of an Array[Row].

Do I need to tweak data before it is passed to Write_Graphml? Or am I
doing something else wrong here.

I had originally posted a version of this message to the dev list, and
Sean Owen suggested WriteGraphML should be a implemented as a class, not
an object. Is that the right path? I have a number of other Scala
functions implemented in the PySpark side of our project that are
objects, and everything works fine.

...and is there a best practices guide or documentation for implementing
Scala functions in PySpark? I've found a number of blog posts that have
been helpful.

Thanks in advance for any help!

cheers!

-nruest

[1]
https://stackoverflow.com/questions/61928886/pyspark-list-to-scala-sequence



signature.asc
Description: OpenPGP digital signature


RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Something Something
I keep getting this error message:


*The message is 1169350 bytes when serialized which is larger than the
maximum request size you have configured with the max.request.size
configuration.*



As indicated in other posts, I am trying to set the “max.request.size”
configuration in the Producer as follows:


-

.writeStream

.format(*"kafka"*)

.option(

  *"kafka.bootstrap.servers"*,

  conig.outputBootstrapServer

)

.option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)

-



But this is not working. Am I setting this correctly? Is there a different
way to set this property under Spark Structured Streaming?


Please help. Thanks.


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
No this is not working even if I use LongAccumulator.

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should be
> atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> 
> From: Something Something 
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a lot of time on this but can't get it to
> work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
> extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be greatly
> appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Re: Spark API and immutability

2020-05-25 Thread Holden Karau
So even on RDDs cache/persist mutate the RDD object. The important thing
for Spark is that the data  represented/in the RDD/Dataframe isn’t mutated.

On Mon, May 25, 2020 at 10:56 AM Chris Thomas 
wrote:

>
> The cache() method on the DataFrame API caught me out.
>
> Having learnt that DataFrames are built on RDDs and that RDDs are
> immutable, when I saw the statement df.cache() in our codebase I thought
> ‘This must be a bug, the result is not assigned, the statement will have no
> affect.’
>
> However, I’ve since learnt that the cache method actually mutates the
> DataFrame object*. The statement was valid after all.
>
> I understand that the underlying user data is immutable, but doesn’t
> mutating the DataFrame object make the API a little inconsistent and harder
> to reason about?
>
> Regards
>
> Chris
>
>
> * (as does persist and rdd.setName methods. I expect there are others)
>
-- 
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


Fwd: Spark API and immutability

2020-05-25 Thread Chris Thomas
The cache() method on the DataFrame API caught me out.

Having learnt that DataFrames are built on RDDs and that RDDs are
immutable, when I saw the statement df.cache() in our codebase I thought
‘This must be a bug, the result is not assigned, the statement will have no
affect.’

However, I’ve since learnt that the cache method actually mutates the
DataFrame object*. The statement was valid after all.

I understand that the underlying user data is immutable, but doesn’t
mutating the DataFrame object make the API a little inconsistent and harder
to reason about?

Regards

Chris


* (as does persist and rdd.setName methods. I expect there are others)


Re: Arrow RecordBatches/Pandas Dataframes to (Arrow enabled) Spark Dataframe conversion in streaming fashion

2020-05-25 Thread Jorge Machado
Hey, from what I know you can try to Union them df.union(df2)

Not sure if this is what you need 

> On 25. May 2020, at 13:53, Tanveer Ahmad - EWI  wrote:
> 
> Hi all,
> 
> I need some help regarding Arrow RecordBatches/Pandas Dataframes to (Arrow 
> enabled) Spark Dataframe conversions.
> Here the example explains very well how to convert a single Pandas Dataframe 
> to Spark Dataframe [1]. 
> 
> But in my case, some external applications are generating Arrow RecordBatches 
> in my PySpark application in streaming fashion. Each time I receive an Arrow 
> RB, I want to transfer/append it to a Spark Dataframe. So is it possible to 
> create a Spark Dataframe initially from one Arrow RecordBatch and then start 
> appending many other in-coming Arrow RecordBatches to that Spark Dataframe 
> (like in streaming fashion)? Thanks!
> 
> I saw another example [2] in which all the Arrow RB are being converted to 
> Spark Dataframe but my case is little bit different than this.  
> 
> [1] https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html 
>  
> [2] https://gist.github.com/linar-jether/7dd61ed6fa89098ab9c58a1ab428b2b5 
> 
> 
> ---
> Regards,
> Tanveer Ahmad



Arrow RecordBatches/Pandas Dataframes to (Arrow enabled) Spark Dataframe conversion in streaming fashion

2020-05-25 Thread Tanveer Ahmad - EWI
Hi all,


I need some help regarding Arrow RecordBatches/Pandas Dataframes to (Arrow 
enabled) Spark Dataframe conversions.

Here the example explains very well how to convert a single Pandas Dataframe to 
Spark Dataframe [1].


But in my case, some external applications are generating Arrow RecordBatches 
in my PySpark application in streaming fashion. Each time I receive an Arrow 
RB, I want to transfer/append it to a Spark Dataframe. So is it possible to 
create a Spark Dataframe initially from one Arrow RecordBatch and then start 
appending many other in-coming Arrow RecordBatches to that Spark Dataframe 
(like in streaming fashion)? Thanks!


I saw another example [2] in which all the Arrow RB are being converted to 
Spark Dataframe but my case is little bit different than this.


[1] https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

[2] https://gist.github.com/linar-jether/7dd61ed6fa89098ab9c58a1ab428b2b5

---
Regards,
Tanveer Ahmad




Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Dhaval for the suggestion, but in the case i mentioned in previous mail 
still data can be missed as the row number will change.


-
Manjunath

From: Dhaval Patel 
Sent: Monday, May 25, 2020 3:01 PM
To: Manjunath Shetty H 
Subject: Re: Parallelising JDBC reads in spark

If possible, set the watermark before reading data. Read the max of watermark 
column before reading actual data and add that in query to read actual data, 
like watermark <= current_watermark

It may query db twice, however it will make sure you are not missing any records

Regards
Dhaval

On Mon, May 25, 2020 at 3:38 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Thanks Georg for the suggestion, but at this point changing the design is not 
really the option.

Any other pointer would be helpful.


Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:52 AM

To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Well you seem to have performance and consistency problems. Using a CDC tool 
fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit more 
complicated. Tools like https://debezium.io/ could be useful - depending on 
your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Georg for the suggestion, but at this point changing the design is not 
really the option.

Any other pointer would be helpful.


Thanks
Manjunath

From: Georg Heiler 
Sent: Monday, May 25, 2020 11:52 AM
To: Manjunath Shetty H 
Cc: Mike Artz ; user 
Subject: Re: Parallelising JDBC reads in spark

Well you seem to have performance and consistency problems. Using a CDC tool 
fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit more 
complicated. Tools like https://debezium.io/ could be useful - depending on 
your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Georg Heiler
Well you seem to have performance and consistency problems. Using a CDC
tool fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit
more complicated. Tools like https://debezium.io/ could be useful -
depending on your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H <
manjunathshe...@live.com>:

> Hi Georg,
>
> Thanks for the response, can please elaborate what do mean by change data
> capture ?
>
> Thanks
> Manjunath
> --
> *From:* Georg Heiler 
> *Sent:* Monday, May 25, 2020 11:14 AM
> *To:* Manjunath Shetty H 
> *Cc:* Mike Artz ; user 
> *Subject:* Re: Parallelising JDBC reads in spark
>
> Why don't you apply proper change data capture?
> This will be more complex though.
>
> Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H <
> manjunathshe...@live.com>:
>
> Hi Mike,
>
> Thanks for the response.
>
> Even with that flag set data miss can happen right ?. As the fetch is
> based on the last watermark (maximum timestamp of the row that last batch
> job fetched ), Take a scenario like this with table
>
> a :  1
> b :  2
> c :  3
> d :  4
> *f  :  6*
> g :  7
> h :  8
> e :  5
>
>
>- a,b,c,d,e get picked by 1 task
>- by the time second task starts, e has been updated, so the row order
>changes
>- As f moves up, it will completely get missed in the fetch
>
>
> Thanks
> Manjunath
>
> --
> *From:* Mike Artz 
> *Sent:* Monday, May 25, 2020 10:50 AM
> *To:* Manjunath Shetty H 
> *Cc:* user 
> *Subject:* Re: Parallelising JDBC reads in spark
>
> Does anything different happened when you set the isolationLevel to do
> Dirty Reads i.e. "READ_UNCOMMITTED"
>
> On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
> Hi,
>
> We are writing a ETL pipeline using Spark, that fetch the data from SQL
> server in batch mode (every 15mins). Problem we are facing when we try to
> parallelising single table reads into multiple tasks without missing any
> data.
>
> We have tried this,
>
>
>- Use `ROW_NUMBER` window function in the SQL query
>- Then do
>-
>
>DataFrame df =
>hiveContext
>.read()
>.jdbc(
>**,
>query,
>"row_num",
>1,
>,
>noOfPartitions,
>jdbcOptions);
>
>
>
> The problem with this approach is if our tables get updated in between in SQL 
> Server while tasks are still running then the `ROW_NUMBER` will change and we 
> may miss some records.
>
>
> Any approach to how to fix this issue ? . Any pointers will be helpful
>
>
> *Note*: I am on spark 1.6
>
>
> Thanks
>
> Manjiunath Shetty
>
>


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler 
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
Cc: Mike Artz ; user 
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty