[ spark-streaming ] - Data Locality issue

2020-02-04 Thread Karthik Srinivas
Hi,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Do you really want to build all of that and open yourself to bugs when you
can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets",
prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as
you're committing the offsets in the end, but the data was already stored?
Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but
ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:

> Thanks Anil, I think that’s the approach I will take.
>
> Hi Burak,
>
> That was a possibility to think about, but my team has custom dataframe
> writer functions we would like to use, unfortunately they were written for
> static dataframes in mind. I do see there is a ForEachBatch write mode but
> my thinking was at that point it was easier to read from kafka through
> batch mode.
>
> Thanks,
> RJ
>
> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>
>> Hi Ruijing,
>>
>> Why do you not want to use structured streaming here? This is exactly why
>> structured streaming + Trigger.Once was built, just so that you don't build
>> that solution yourself.
>> You also get exactly once semantics if you use the built in sinks.
>>
>> Best,
>> Burak
>>
>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>>
>>> Hi Ruijing,
>>>
>>> We did the below things to read Kafka in batch from spark:
>>>
>>> 1) Maintain the start offset (could be db, file etc)
>>> 2) Get the end offset dynamically when the job executes.
>>> 3) Pass the start and end offsets
>>> 4) Overwrite the start offset with the end offset. (Should be done post
>>> processing the data)
>>>
>>> Currently to make it work in batch mode, you need to maintain the state
>>> information of the offsets externally.
>>>
>>>
>>> Thanks
>>> Anil
>>>
>>> -Sent from my mobile
>>> http://anilkulkarni.com/
>>>
>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>>>
 Hi all,

 My use case is to read from single kafka topic using a batch spark sql
 job (not structured streaming ideally). I want this batch job every time it
 starts to get the last offset it stopped at, and start reading from there
 until it caught up to the latest offset, store the result and stop the job.
 Given the dataframe has a partition and offset column, my first thought for
 offset management is to groupBy partition and agg the max offset, then
 store it in HDFS. Next time the job runs, it will read and start from this
 max offset using startingOffsets

 However, I was wondering if this will work. If the kafka producer
 failed an offset and later decides to resend it, I will have skipped it
 since I’m starting from the max offset sent. How does spark structured
 streaming know to continue onwards - does it keep a state of all offsets
 seen? If so, how can I replicate this for batch without missing data? Any
 help would be appreciated.


 --
 Cheers,
 Ruijing Li

>>> --
> Cheers,
> Ruijing Li
>


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Ruijing Li
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe
writer functions we would like to use, unfortunately they were written for
static dataframes in mind. I do see there is a ForEachBatch write mode but
my thinking was at that point it was easier to read from kafka through
batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:

> Hi Ruijing,
>
> Why do you not want to use structured streaming here? This is exactly why
> structured streaming + Trigger.Once was built, just so that you don't build
> that solution yourself.
> You also get exactly once semantics if you use the built in sinks.
>
> Best,
> Burak
>
> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>
>> Hi Ruijing,
>>
>> We did the below things to read Kafka in batch from spark:
>>
>> 1) Maintain the start offset (could be db, file etc)
>> 2) Get the end offset dynamically when the job executes.
>> 3) Pass the start and end offsets
>> 4) Overwrite the start offset with the end offset. (Should be done post
>> processing the data)
>>
>> Currently to make it work in batch mode, you need to maintain the state
>> information of the offsets externally.
>>
>>
>> Thanks
>> Anil
>>
>> -Sent from my mobile
>> http://anilkulkarni.com/
>>
>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> My use case is to read from single kafka topic using a batch spark sql
>>> job (not structured streaming ideally). I want this batch job every time it
>>> starts to get the last offset it stopped at, and start reading from there
>>> until it caught up to the latest offset, store the result and stop the job.
>>> Given the dataframe has a partition and offset column, my first thought for
>>> offset management is to groupBy partition and agg the max offset, then
>>> store it in HDFS. Next time the job runs, it will read and start from this
>>> max offset using startingOffsets
>>>
>>> However, I was wondering if this will work. If the kafka producer failed
>>> an offset and later decides to resend it, I will have skipped it since I’m
>>> starting from the max offset sent. How does spark structured streaming know
>>> to continue onwards - does it keep a state of all offsets seen? If so, how
>>> can I replicate this for batch without missing data? Any help would be
>>> appreciated.
>>>
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li


Data locality

2020-02-04 Thread Karthik Srinivas
Hi all,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Burak Yavuz
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why
structured streaming + Trigger.Once was built, just so that you don't build
that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:

> Hi Ruijing,
>
> We did the below things to read Kafka in batch from spark:
>
> 1) Maintain the start offset (could be db, file etc)
> 2) Get the end offset dynamically when the job executes.
> 3) Pass the start and end offsets
> 4) Overwrite the start offset with the end offset. (Should be done post
> processing the data)
>
> Currently to make it work in batch mode, you need to maintain the state
> information of the offsets externally.
>
>
> Thanks
> Anil
>
> -Sent from my mobile
> http://anilkulkarni.com/
>
> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>
>> Hi all,
>>
>> My use case is to read from single kafka topic using a batch spark sql
>> job (not structured streaming ideally). I want this batch job every time it
>> starts to get the last offset it stopped at, and start reading from there
>> until it caught up to the latest offset, store the result and stop the job.
>> Given the dataframe has a partition and offset column, my first thought for
>> offset management is to groupBy partition and agg the max offset, then
>> store it in HDFS. Next time the job runs, it will read and start from this
>> max offset using startingOffsets
>>
>> However, I was wondering if this will work. If the kafka producer failed
>> an offset and later decides to resend it, I will have skipped it since I’m
>> starting from the max offset sent. How does spark structured streaming know
>> to continue onwards - does it keep a state of all offsets seen? If so, how
>> can I replicate this for batch without missing data? Any help would be
>> appreciated.
>>
>>
>> --
>> Cheers,
>> Ruijing Li
>>
>


Committer to use if "spark.sql.sources.partitionOverwriteMode": 'dynamic'

2020-02-04 Thread edge7
Hi,

I am using Spark on EMR, and I was hoping to use their optimised committer,
but it looks like that, if 
"spark.sql.sources.partitionOverwriteMode": 'dynamic' then
it will not be used.

What are the best practices to use in this case?
The renaming phase in S3, is very slow, and the bottleneck in my job.

Thanks,




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: shuffle mathematic formulat

2020-02-04 Thread Aironman DirtDiver
I would have to check it, but in principle it could be done by checking the
streaming logs, so that once you detect when a shuffle operation starts and
ends, you can know the total operation time.


https://stackoverflow.com/questions/27276884/what-is-shuffle-read-shuffle-write-in-apache-spark

El mar., 4 feb. 2020 a las 12:58, asma zgolli ()
escribió:

> dear spark contributors,
>
> I'm searching for a way to model spark shuffle cost and i wonder if there
> s mathematic formulas to compute "shuffle read " and "shuffle write" sizes
> in the stages view in spark UI.
> if there isn't, are there any references to head start in this.
> Stage Id  ▾
> 
> Description
> 
> Submitted
> 
> Duration
> Tasks:
> Succeeded/TotalInput
> 
> Output
> Shuffle
> Read
> Shuffle
> Write
> 
>
> thank you for the help and the directions
> yours sincerely
> Asma ZGOLLI
>
> Ph.D. student in data engineering - computer science
>


-- 
Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman



shuffle mathematic formulat

2020-02-04 Thread asma zgolli
dear spark contributors,

I'm searching for a way to model spark shuffle cost and i wonder if there s
mathematic formulas to compute "shuffle read " and "shuffle write" sizes in
the stages view in spark UI.
if there isn't, are there any references to head start in this.
Stage Id  ▾

Description

Submitted

Duration
Tasks:
Succeeded/TotalInput

Output
Shuffle
Read
Shuffle
Write


thank you for the help and the directions
yours sincerely
Asma ZGOLLI

Ph.D. student in data engineering - computer science