Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not
the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but
has some added functionality. Somehow my custom write function isn’t
working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li  wrote:

> Hi all,
>
> I tried with forEachBatch but got an error. Is this expected?
>
> Code is
>
> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
> batchDF.write.parquet(hdfsPath)
> }
> .option(“checkPointLocation”, anotherHdfsPath)
> .start()
>
> Exception is: Queries with streaming sources must be executed with
> writeStream.start()
>
> But I thought forEachBatch would treat the batchDF as a static dataframe?
>
> Thanks,
> RJ
>
> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta 
> wrote:
>
>> Hi Burak,
>>
>> I am not quite used to streaming, but was almost thinking on the same
>> lines :) makes a lot of sense to me now.
>>
>> Regards,
>> Gourav
>>
>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:
>>
>>> Do you really want to build all of that and open yourself to bugs when
>>> you can just use foreachBatch? Here are your options:
>>>
>>> 1. Build it yourself
>>>
>>> // Read offsets from some store
>>> prevOffsets = readOffsets()
>>> latestOffsets = getOffsets()
>>>
>>> df = spark.read.format("kafka").option("startOffsets",
>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>> batchLogic(df)
>>>
>>> saveOffsets(latestOffsets)
>>>
>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>
>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>> batchId) => batchLogic(df)).trigger("once").start()
>>>
>>> With Option (1), you're going to have to (re)solve:
>>>  a) Tracking and consistency of offsets
>>>  b) Potential topic partition mismatches
>>>  c) Offsets that may have aged out due to retention
>>>  d) Re-execution of jobs and data consistency. What if your job fails as
>>> you're committing the offsets in the end, but the data was already stored?
>>> Will your getOffsets method return the same offsets?
>>>
>>> I'd rather not solve problems that other people have solved for me, but
>>> ultimately the decision is yours to make.
>>>
>>> Best,
>>> Burak
>>>
>>>
>>>
>>>
>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:
>>>
 Thanks Anil, I think that’s the approach I will take.

 Hi Burak,

 That was a possibility to think about, but my team has custom dataframe
 writer functions we would like to use, unfortunately they were written for
 static dataframes in mind. I do see there is a ForEachBatch write mode but
 my thinking was at that point it was easier to read from kafka through
 batch mode.

 Thanks,
 RJ

 On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:

> Hi Ruijing,
>
> Why do you not want to use structured streaming here? This is exactly
> why structured streaming + Trigger.Once was built, just so that you don't
> build that solution yourself.
> You also get exactly once semantics if you use the built in sinks.
>
> Best,
> Burak
>
> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni 
> wrote:
>
>> Hi Ruijing,
>>
>> We did the below things to read Kafka in batch from spark:
>>
>> 1) Maintain the start offset (could be db, file etc)
>> 2) Get the end offset dynamically when the job executes.
>> 3) Pass the start and end offsets
>> 4) Overwrite the start offset with the end offset. (Should be done
>> post processing the data)
>>
>> Currently to make it work in batch mode, you need to maintain the
>> state information of the offsets externally.
>>
>>
>> Thanks
>> Anil
>>
>> -Sent from my mobile
>> http://anilkulkarni.com/
>>
>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li 
>> wrote:
>>
>>> Hi all,
>>>
>>> My use case is to read from single kafka topic using a batch spark
>>> sql job (not structured streaming ideally). I want this batch job every
>>> time it starts to get the last offset it stopped at, and start reading 
>>> from
>>> there until it caught up to the latest offset, store the result and stop
>>> the job. Given the dataframe has a partition and offset column, my first
>>> thought for offset management is to groupBy partition and agg the max
>>> offset, then store it in HDFS. Next time the job runs, it will read and
>>> start from this max offset using startingOffsets
>>>
>>> However, I was wondering if this will work. If the kafka producer
>>> failed an offset and later decides to resend it, I will have skipped it
>>> since I’m starting from the max offset sent. How does spark structured
>>> streaming know to continue onwards - does it keep a state of all 

Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
batchDF.write.parquet(hdfsPath)
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with
writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta 
wrote:

> Hi Burak,
>
> I am not quite used to streaming, but was almost thinking on the same
> lines :) makes a lot of sense to me now.
>
> Regards,
> Gourav
>
> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:
>
>> Do you really want to build all of that and open yourself to bugs when
>> you can just use foreachBatch? Here are your options:
>>
>> 1. Build it yourself
>>
>> // Read offsets from some store
>> prevOffsets = readOffsets()
>> latestOffsets = getOffsets()
>>
>> df = spark.read.format("kafka").option("startOffsets",
>> prevOffsets).option("endOffsets", latestOffsets).load()
>> batchLogic(df)
>>
>> saveOffsets(latestOffsets)
>>
>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>
>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>> batchId) => batchLogic(df)).trigger("once").start()
>>
>> With Option (1), you're going to have to (re)solve:
>>  a) Tracking and consistency of offsets
>>  b) Potential topic partition mismatches
>>  c) Offsets that may have aged out due to retention
>>  d) Re-execution of jobs and data consistency. What if your job fails as
>> you're committing the offsets in the end, but the data was already stored?
>> Will your getOffsets method return the same offsets?
>>
>> I'd rather not solve problems that other people have solved for me, but
>> ultimately the decision is yours to make.
>>
>> Best,
>> Burak
>>
>>
>>
>>
>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:
>>
>>> Thanks Anil, I think that’s the approach I will take.
>>>
>>> Hi Burak,
>>>
>>> That was a possibility to think about, but my team has custom dataframe
>>> writer functions we would like to use, unfortunately they were written for
>>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>>> my thinking was at that point it was easier to read from kafka through
>>> batch mode.
>>>
>>> Thanks,
>>> RJ
>>>
>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>>>
 Hi Ruijing,

 Why do you not want to use structured streaming here? This is exactly
 why structured streaming + Trigger.Once was built, just so that you don't
 build that solution yourself.
 You also get exactly once semantics if you use the built in sinks.

 Best,
 Burak

 On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:

> Hi Ruijing,
>
> We did the below things to read Kafka in batch from spark:
>
> 1) Maintain the start offset (could be db, file etc)
> 2) Get the end offset dynamically when the job executes.
> 3) Pass the start and end offsets
> 4) Overwrite the start offset with the end offset. (Should be done
> post processing the data)
>
> Currently to make it work in batch mode, you need to maintain the
> state information of the offsets externally.
>
>
> Thanks
> Anil
>
> -Sent from my mobile
> http://anilkulkarni.com/
>
> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li 
> wrote:
>
>> Hi all,
>>
>> My use case is to read from single kafka topic using a batch spark
>> sql job (not structured streaming ideally). I want this batch job every
>> time it starts to get the last offset it stopped at, and start reading 
>> from
>> there until it caught up to the latest offset, store the result and stop
>> the job. Given the dataframe has a partition and offset column, my first
>> thought for offset management is to groupBy partition and agg the max
>> offset, then store it in HDFS. Next time the job runs, it will read and
>> start from this max offset using startingOffsets
>>
>> However, I was wondering if this will work. If the kafka producer
>> failed an offset and later decides to resend it, I will have skipped it
>> since I’m starting from the max offset sent. How does spark structured
>> streaming know to continue onwards - does it keep a state of all offsets
>> seen? If so, how can I replicate this for batch without missing data? Any
>> help would be appreciated.
>>
>>
>> --
>> Cheers,
>> Ruijing Li
>>
> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li


subscribe

2020-02-05 Thread Cool Joe
subscribe


SparkAppHandle can not stop application in yarn client mode

2020-02-05 Thread Zhang Victor
Hi all,

When using spark launcher starts app in yarn client mode, the 
sparkAppHandle#stop() can not stop the application.


SparkLauncher launcher = new SparkLauncher()
.setAppName("My Launcher")
.setJavaHome("/usr/bin/hadoop/software/java")
.setSparkHome("/usr/bin/hadoop/software/sparkonyarn")
.setConf("spark.executor.instances", "1")
.setConf("spark.executor.memory", "1G")
.setConf(SparkLauncher.EXECUTOR_CORES, "1")
.setAppResource(jarPath)
.setMainClass(mainClass);


SparkAppHandle sparkAppHandle = launcher.startApplication(new 
SparkAppHandle.Listener() {...});

When app started, the launcher app receive a signal outside and stop the spark 
app.


sparkAppHandle.stop();

But the spark app is still running.

In yarn cluster mode, sparkAppHandle#stop() can stop the spark app.

I find some code in org/apache/spark/deploy/yarn/Client.scala


private val launcherBackend = new LauncherBackend() {
  override protected def conf: SparkConf = sparkConf

  override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) {
  yarnClient.killApplication(appId)
} else {
  setState(SparkAppHandle.State.KILLED)
  stop()
}
  }
}

def stop(): Unit = {
  launcherBackend.close()
  yarnClient.stop()
}

Maybe only stop the yarn client and not stop the spark app.

My spark version is 2.3.1 and I tried 3.0.0-preview, but the same result.

Can anyone help me?

Thanks.


Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Gourav Sengupta
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines
:) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:

> Do you really want to build all of that and open yourself to bugs when you
> can just use foreachBatch? Here are your options:
>
> 1. Build it yourself
>
> // Read offsets from some store
> prevOffsets = readOffsets()
> latestOffsets = getOffsets()
>
> df = spark.read.format("kafka").option("startOffsets",
> prevOffsets).option("endOffsets", latestOffsets).load()
> batchLogic(df)
>
> saveOffsets(latestOffsets)
>
> 2. Structured Streaming + Trigger.Once + foreachBatch
>
> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
> batchId) => batchLogic(df)).trigger("once").start()
>
> With Option (1), you're going to have to (re)solve:
>  a) Tracking and consistency of offsets
>  b) Potential topic partition mismatches
>  c) Offsets that may have aged out due to retention
>  d) Re-execution of jobs and data consistency. What if your job fails as
> you're committing the offsets in the end, but the data was already stored?
> Will your getOffsets method return the same offsets?
>
> I'd rather not solve problems that other people have solved for me, but
> ultimately the decision is yours to make.
>
> Best,
> Burak
>
>
>
>
> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:
>
>> Thanks Anil, I think that’s the approach I will take.
>>
>> Hi Burak,
>>
>> That was a possibility to think about, but my team has custom dataframe
>> writer functions we would like to use, unfortunately they were written for
>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>> my thinking was at that point it was easier to read from kafka through
>> batch mode.
>>
>> Thanks,
>> RJ
>>
>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>>
>>> Hi Ruijing,
>>>
>>> Why do you not want to use structured streaming here? This is exactly
>>> why structured streaming + Trigger.Once was built, just so that you don't
>>> build that solution yourself.
>>> You also get exactly once semantics if you use the built in sinks.
>>>
>>> Best,
>>> Burak
>>>
>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>>>
 Hi Ruijing,

 We did the below things to read Kafka in batch from spark:

 1) Maintain the start offset (could be db, file etc)
 2) Get the end offset dynamically when the job executes.
 3) Pass the start and end offsets
 4) Overwrite the start offset with the end offset. (Should be done post
 processing the data)

 Currently to make it work in batch mode, you need to maintain the state
 information of the offsets externally.


 Thanks
 Anil

 -Sent from my mobile
 http://anilkulkarni.com/

 On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:

> Hi all,
>
> My use case is to read from single kafka topic using a batch spark sql
> job (not structured streaming ideally). I want this batch job every time 
> it
> starts to get the last offset it stopped at, and start reading from there
> until it caught up to the latest offset, store the result and stop the 
> job.
> Given the dataframe has a partition and offset column, my first thought 
> for
> offset management is to groupBy partition and agg the max offset, then
> store it in HDFS. Next time the job runs, it will read and start from this
> max offset using startingOffsets
>
> However, I was wondering if this will work. If the kafka producer
> failed an offset and later decides to resend it, I will have skipped it
> since I’m starting from the max offset sent. How does spark structured
> streaming know to continue onwards - does it keep a state of all offsets
> seen? If so, how can I replicate this for batch without missing data? Any
> help would be appreciated.
>
>
> --
> Cheers,
> Ruijing Li
>
 --
>> Cheers,
>> Ruijing Li
>>
>