Re: [Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-08 Thread Mich Talebzadeh
you may consider

- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.

OR

- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:


   - Filter: Filter out records older than the watermark in the main
   pipeline.  say

   resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 *withWatermark("timestamp", "5 minutes").* \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

   - Write to Sink: Write the filtered records (dropped records) to a
   separate Kafka topic.
   - Consume and Store: Consume the dropped records topic with another
   streaming job and store them in a Postgres table or S3 using lib


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 8 May 2024 at 05:13, Nandha Kumar  wrote:

> Hi Team,
>We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>


[Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-07 Thread Nandha Kumar
Hi Team,
   We are trying to use *spark structured streaming *for our use case.
We will be joining 2 streaming sources(from kafka topic) with watermarks.
As time progresses, the records that are prior to the watermark timestamp
are removed from the state. For our use case, we want to *store these
dropped records* in some postgres table or s3.

When searching, we found a similar question
in
StackOverflow which is unanswered.
*We would like to know how to store these dropped records due to the
watermark.*


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (apache.org)
 as a feature request.

Let me think about another way and revert

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
>
> can you please confirm me is my understanding correct?
>
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
>
> Thanks in advance.
>
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>>
>>>
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>>
>>>
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>>
>>>
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>>
>>>
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.
>>>
>>


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
wrote:

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>>
>>
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>>
>>
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>>
>>
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>>
>>
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.
>>
>


Re: ********Spark streaming issue to Elastic data**********

2024-05-03 Thread Mich Talebzadeh
My recommendation! is using materialized views (MVs) created in Hive with
Spark Structured Streaming and Change Data Capture (CDC) is a good
combination for efficiently streaming view data updates in your scenario.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:

> Hi All,
>
> Requirements:
> I am working on the data flow, which will use the view definition(view
> definition already defined in schema), there are multiple tables used in
> the view definition. Here we want to stream the view data into elastic
> index based on if any of the table(used in the view definition) data got
> changed.
>
>
> Current flow:
> 1. we are inserting id's from the table(which used in the view definition)
> into the common table.
> 2. From the common table by using the id, we will be streaming the view
> data (by using if any of the incomming id is present in the collective id
> of all tables used from view definition) by using spark structured
> streaming.
>
>
> Issue:
> 1. Here we are facing issue - For each incomming id here we running view
> definition(so it will read all the data from all the data) and check if any
> of the incomming id is present in the collective id's of view result, Due
> to which it is taking more memory in the cluster driver and taking more
> time to process.
>
>
> I am epxpecting an alternate solution, if we can avoid full scan of view
> definition every time, If you have any alternate deisgn flow how we can
> achieve the result, please suggest for the same.
>
>
> Note: Also, it will be helpfull, if you can share the details like
> community forum or platform to discuss this kind of deisgn related topics,
> it will be more helpfull.
>


********Spark streaming issue to Elastic data**********

2024-05-02 Thread Karthick Nk
Hi All,

Requirements:
I am working on the data flow, which will use the view definition(view
definition already defined in schema), there are multiple tables used in
the view definition. Here we want to stream the view data into elastic
index based on if any of the table(used in the view definition) data got
changed.


Current flow:
1. we are inserting id's from the table(which used in the view definition)
into the common table.
2. From the common table by using the id, we will be streaming the view
data (by using if any of the incomming id is present in the collective id
of all tables used from view definition) by using spark structured
streaming.


Issue:
1. Here we are facing issue - For each incomming id here we running view
definition(so it will read all the data from all the data) and check if any
of the incomming id is present in the collective id's of view result, Due
to which it is taking more memory in the cluster driver and taking more
time to process.


I am epxpecting an alternate solution, if we can avoid full scan of view
definition every time, If you have any alternate deisgn flow how we can
achieve the result, please suggest for the same.


Note: Also, it will be helpfull, if you can share the details like
community forum or platform to discuss this kind of deisgn related topics,
it will be more helpfull.


Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Thanks, Mich for your reply.

I agree, it is not so scalable and efficient. But it works correctly for
kafka transaction, and there is no problem with committing offset to kafka
async for now.

I try to tell you some more details about my streaming job.
CustomReceiver does not receive anything from outside and just forward
notice message to run an executor in which kafka consumer will be run.
See my CustomReceiver.

private static class CustomReceiver extends Receiver {

public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}

@Override
public void onStart() {
new Thread(this::receive).start();
}

private void receive() {
String input = "receiver input " + UUID.randomUUID().toString();
store(input);
}

@Override
public void onStop() {

}
}


Actually, just one Kafka consumer will be run which consumes committed
messages from kafka directly(, which is not so scalable, I think.).
But the main point of this approach which I need is that spark
session needs to be used to save rdd(parallelized consumed messages) to
iceberg table.
Consumed messages will be converted to spark rdd which will be saved to
iceberg table using spark session.

I have tested this spark streaming job with transactional producers which
send several millions of messages. Correctly consumed and saved to iceberg
tables correctly.

- Kidong.



2024년 4월 14일 (일) 오후 11:05, Mich Talebzadeh 님이 작성:

> Interesting
>
> My concern is infinite Loop in* foreachRDD*: The *while(true)* loop
> within foreachRDD creates an infinite loop within each Spark executor. This
> might not be the most efficient approach, especially since offsets are
> committed asynchronously.?
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:
>
>>
>> Because spark streaming for kafk transaction does not work correctly to
>> suit my need, I moved to another approach using raw kafka consumer which
>> handles read_committed messages from kafka correctly.
>>
>> My codes look like the following.
>>
>> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
>> CustomReceiver does nothing special except awaking foreach task.
>>
>> stream.foreachRDD(rdd -> {
>>
>>   KafkaConsumer consumer = new 
>> KafkaConsumer<>(consumerProperties);
>>
>>   consumer.subscribe(Arrays.asList(topic));
>>
>>   while(true){
>>
>> ConsumerRecords records =
>> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>>
>> Map offsetMap = new HashMap<>();
>>
>> List someList = new ArrayList<>();
>>
>> for (ConsumerRecord consumerRecord : records) {
>>
>>   // add something to list.
>>
>>   // put offset to offsetMap.
>>
>> }
>>
>> // process someList.
>>
>> // commit offset.
>>
>> consumer.commitAsync(offsetMap, null);
>>
>>   }
>>
>> });
>>
>>
>> In addition, I increased max.poll.records to 10.
>>
>> Even if this raw kafka consumer approach is not so scalable, it consumes
>> read_committed messages from kafka correctly and is enough for me at the
>> moment.
>>
>> - Kidong.
>>
>>
>>
>> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>>
>>> Hi,
>>>
>>> I have a kafka producer which sends messages transactionally to kafka
>>> and spark streaming job which should consume read_committed messages from
>>> kafka.
>>> But there is a problem for spark streaming to consume read_committed
>>> messages.
>>> The count of messages sent by kafka producer transactionally is not the
>>> same to the count of the read_committed messages consumed by spark
>>> streaming.
>>>
>>> Some consumer properties of my spark streaming job are as follows.
>>>
>>> auto.offset.reset=earliest
>>> enable.auto.commit=false
>>> isolat

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Mich Talebzadeh
Interesting

My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within
foreachRDD creates an infinite loop within each Spark executor. This might
not be the most efficient approach, especially since offsets are committed
asynchronously.?

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:

>
> Because spark streaming for kafk transaction does not work correctly to
> suit my need, I moved to another approach using raw kafka consumer which
> handles read_committed messages from kafka correctly.
>
> My codes look like the following.
>
> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
> CustomReceiver does nothing special except awaking foreach task.
>
> stream.foreachRDD(rdd -> {
>
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProperties);
>
>   consumer.subscribe(Arrays.asList(topic));
>
>   while(true){
>
> ConsumerRecords records =
> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>
> Map offsetMap = new HashMap<>();
>
> List someList = new ArrayList<>();
>
> for (ConsumerRecord consumerRecord : records) {
>
>   // add something to list.
>
>   // put offset to offsetMap.
>
> }
>
> // process someList.
>
> // commit offset.
>
> consumer.commitAsync(offsetMap, null);
>
>   }
>
> });
>
>
> In addition, I increased max.poll.records to 10.
>
> Even if this raw kafka consumer approach is not so scalable, it consumes
> read_committed messages from kafka correctly and is enough for me at the
> moment.
>
> - Kidong.
>
>
>
> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>// process something.
>>
>>
>>// commit offset.
>>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> afte

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.

My codes look like the following.

JavaDStream stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.

stream.foreachRDD(rdd -> {

  KafkaConsumer consumer = new
KafkaConsumer<>(consumerProperties);

  consumer.subscribe(Arrays.asList(topic));

  while(true){

ConsumerRecords records =
consumer.poll(java.time.Duration.ofMillis(intervalMs));

Map offsetMap = new HashMap<>();

List someList = new ArrayList<>();

for (ConsumerRecord consumerRecord : records) {

  // add something to list.

  // put offset to offsetMap.

}

// process someList.

// commit offset.

consumer.commitAsync(offsetMap, null);

  }

});


In addition, I increased max.poll.records to 10.

Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the
moment.

- Kidong.



2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>
>
>
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
>
> Any help appreciated.
>
> - Kidong.
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> <http://www.cloudchef-labs.com/>
>


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>


Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Kidong Lee
Thank you Mich for your reply.

Actually, I tried to do most of your advice.

When spark.streaming.kafka.allowNonConsecutiveOffsets=false, I got the
following error.

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 3)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Got wrong record
for spark-executor-school-student-group school-student-7 even after seeking
to offset 11206961 got offset 11206962 instead. If this is a compacted
topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:155)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)


And I tried to increase spark.streaming.kafka.consumer.poll.ms to avoid the
exceptions, but it did not help.


- Kidong.




2024년 4월 14일 (일) 오전 4:25, Mich Talebzadeh 님이 작성:

> Hi Kidong,
>
> There may be few potential reasons why the message counts from your Kafka
> producer and Spark Streaming consumer might not match, especially with
> transactional messages and read_committed isolation level.
>
> 1) Just ensure that both your Spark Streaming job and the Kafka consumer
> written with raw kafka-clients use the same consumer group. Messages are
> delivered to specific consumer groups, and if they differ, Spark Streaming
> might miss messages consumed by the raw consumer.
> 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
> uses *commitAsync manually*. However, I noted
> *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
> causing the problem. This setting allows Spark Streaming to read offsets
> that are not strictly increasing, which can happen with transactional
> reads. Generally recommended to set this to* false *for transactional
> reads to ensure Spark Streaming only reads committed messages.
> 3) Missed messages, in transactional messages, Kafka guarantees *delivery
> only after the transaction commits successfully. *There could be a slight
> delay between the producer sending the message and it becoming visible to
> consumers under read_committed isolation level. Spark Streaming could
> potentially miss messages during this window.
> 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
> records for a specific topic partition. Review your code handling of
> potential exceptions during rdd.foreachRDD processing. Ensure retries or
> appropriate error handling if encountering issues with specific partitions.
> 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
> <http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling
> frequency and potentially improve visibility into committed messages.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Mich Talebzadeh
Hi Kidong,

There may be few potential reasons why the message counts from your Kafka
producer and Spark Streaming consumer might not match, especially with
transactional messages and read_committed isolation level.

1) Just ensure that both your Spark Streaming job and the Kafka consumer
written with raw kafka-clients use the same consumer group. Messages are
delivered to specific consumer groups, and if they differ, Spark Streaming
might miss messages consumed by the raw consumer.
2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
uses *commitAsync manually*. However, I noted
*spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
causing the problem. This setting allows Spark Streaming to read offsets
that are not strictly increasing, which can happen with transactional
reads. Generally recommended to set this to* false *for transactional reads
to ensure Spark Streaming only reads committed messages.
3) Missed messages, in transactional messages, Kafka guarantees *delivery
only after the transaction commits successfully. *There could be a slight
delay between the producer sending the message and it becoming visible to
consumers under read_committed isolation level. Spark Streaming could
potentially miss messages during this window.
4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
records for a specific topic partition. Review your code handling of
potential exceptions during rdd.foreachRDD processing. Ensure retries or
appropriate error handling if encountering issues with specific partitions.
5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
<http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling
frequency and potentially improve visibility into committed messages.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsume

Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-12 Thread Kidong Lee
Hi,

I have a kafka producer which sends messages transactionally to kafka and
spark streaming job which should consume read_committed messages from kafka.
But there is a problem for spark streaming to consume read_committed
messages.
The count of messages sent by kafka producer transactionally is not the
same to the count of the read_committed messages consumed by spark
streaming.

Some consumer properties of my spark streaming job are as follows.

auto.offset.reset=earliest
enable.auto.commit=false
isolation.level=read_committed


I also added the following spark streaming configuration.

sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
sparkConf.set("spark.streaming.kafka.consumer.poll.ms",
String.valueOf(2 * 60 * 1000));


My spark streaming is using DirectStream like this.

JavaInputDStream> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);


stream.foreachRDD(rdd -> O

   // get offset ranges.

   OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();

   // process something.


   // commit offset.
   ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

}
);



I have tested with a kafka consumer written with raw kafka-clients jar
library without problem that it consumes read_committed messages correctly,
and the count of consumed read_committed messages is equal to the count of
messages sent by kafka producer.


And sometimes, I got the following exception.

Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Failed to get
records for compacted spark-executor-school-student-group school-student-7
after polling for 12

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)



I have experienced spark streaming job which works fine with kafka messages
which are non-transactional, and I never encountered the exceptions like
above.
It seems that spark streaming for kafka transaction does not handle such as
kafka consumer properties like isolation.level=read_committed and
enable.auto.commit=false correctly.

Any help appreciated.

- Kidong.


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>


Kafka-based Spark Streaming and Vertex AI for Sentiment Analysis

2024-02-21 Thread Mich Talebzadeh
I am working on a pet project to implement a real-time sentiment analysis
system for analyzing customer reviews. It leverages Kafka for data
ingestion, Spark Structured Streaming (SSS) for real-time processing, and
Vertex AI for sentiment analysis and potential action triggers.

*Features*

   - Real-time processing of customer reviews using SSS.
   - Sentiment analysis using pre-assigned labels or Vertex AI
   models.
   - Integration with Vertex AI for model deployment and prediction serving.
   - Potential actions based on sentiment analysis results
   (e.g., notifications, database updates).


*Tech stack*

   - Kafka: Stream processing platform for data ingestion.
   - SSS for real-time data processing on incoming messages with cleansing
   - Vertex AI: Machine learning platform for model training


I have created sample Json data with relevant attributes for product review as
shown below

{
  "rowkey": "7de43681-0e4a-45cb-ad40-5f14f5678333",
  "product_id": "product-id-1616",
  "timereported": "2024-02-21T08:46:40",
  "description": "Easy to use and setup, perfect for beginners.",
  "price": GBP507,
  "sentiment": negative,
  "product_category": "Electronics",
  "customer_id": "customer4",
  "location": "UK",
  "rating": 6,
  "review_text": "Sleek and modern design, but lacking some features.",
  "user_feedback": "Negative",
  "review_source": "online",
  "sentiment_confidence": 0.33,
  "product_features": "user-friendly",
  "timestamp": "",
  "language": "English"
},

I also attached a high level diagram. There is recently a demand for Gemini
usage. Your views are appreciated.


Thanks

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* I am an architect and not a data scientist. The information
provided is correct to the best of my knowledge but of course cannot be
guaranteed . It is essential to note that, as with any advice, quote "one test
result is worth one-thousand expert opinions (Werner
Von Braun
)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [Streaming (DStream) ] : Does Spark Streaming supports pause/resume consumption of message from Kafka?

2023-12-01 Thread Mich Talebzadeh
Ok pause/continue to throw some challenges.

The implication is to pause gracefully and resume the same' First have a
look at this SPIP of mine

[SPARK-42485] SPIP: Shutting down spark structured streaming when the
streaming process completed current process - ASF JIRA (apache.org)


Then we can assume a
graceful pause/restart

As a suggestion, to implement conditional pausing and resuming, you can
introduce a flag or control signal within your DStream processing logic.
When the condition for pausing is met, the stop() method is called to
temporarily halt message processing. Conversely, when the condition for
resuming is met, the start() method is invoked to re-enable message
consumption.

Let us have a go at it

is_paused = False def process_stream(message): global is_paused if not
is_paused: # Perform processing logic here print(message) # Check for
pausing condition if should_pause(message): is_paused = True stream.stop() #
Check for resuming condition if should_resume() and is_paused: is_paused =
False stream.start() stream = DStream(source)
stream.foreach(process_stream) stream.start()

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 1 Dec 2023 at 12:56, Saurabh Agrawal (180813)
 wrote:

> Hi Spark Team,
>
> I am using Spark 3.4.0 version in my application which is use to consume
> messages from Kafka topics.
>
> I have below queries:
>
> 1. Does DStream support pause/resume streaming message consumption at
> runtime on particular condition? If yes, please provide details.
>
> 2. I tried to revoke partition from consumer at runtime which cause error.
>
>
>
> *throw new IllegalStateException(s"Previously tracked partitions " +*
>
> *s"${revokedPartitions.mkString("[", ",", "]")} been revoked by
> Kafka because of consumer " +*
>
> *s"rebalance. This is mostly due to another stream with same group
> id joined, " +*
>
> *s"please check if there're different streaming application
> misconfigure to use same " +*
>
> *s"group id. Fundamentally different stream should use different
> group id")*
>
>
>
>
>
> 3. Does Spark support Blue/Green Deployment. I need to implement
> Blue/Green Deployment scenario with Spark. Facing problem as need to deploy
> both Blue and Green deployment with same consumer-group-id. As I read,
> spark does not support 2 deployment with same consumer group-id, this
> implementation is failing. Please guide how this can be implemented with
> Spark.
>
> 4. Does Spark support Active-Active deployment.
>
>
>
> It will be great if you can reply on above queries please.
>
>
>
> --
>
>
> * Regards,*
>
> *Saurabh Agrawal*
>
> [image: Image]
>
> Software Development Specialist, IPaaS R
> [image: A picture containing logoDescription automatically generated]
>
>
>
>
>
> *This email and the information contained herein is proprietary and
> confidential and subject to the Amdocs Email Terms of Service, which you
> may review at* *https://www.amdocs.com/about/email-terms-of-service*
> 
>


[Streaming (DStream) ] : Does Spark Streaming supports pause/resume consumption of message from Kafka?

2023-12-01 Thread Saurabh Agrawal (180813)
Hi Spark Team,

I am using Spark 3.4.0 version in my application which is use to consume 
messages from Kafka topics.

I have below queries:

1. Does DStream support pause/resume streaming message consumption at runtime 
on particular condition? If yes, please provide details.
2. I tried to revoke partition from consumer at runtime which cause error.

throw new IllegalStateException(s"Previously tracked partitions " +
s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka 
because of consumer " +
s"rebalance. This is mostly due to another stream with same group id 
joined, " +
s"please check if there're different streaming application misconfigure 
to use same " +
s"group id. Fundamentally different stream should use different group 
id")


3. Does Spark support Blue/Green Deployment. I need to implement Blue/Green 
Deployment scenario with Spark. Facing problem as need to deploy both Blue and 
Green deployment with same consumer-group-id. As I read, spark does not support 
2 deployment with same consumer group-id, this implementation is failing. 
Please guide how this can be implemented with Spark.
4. Does Spark support Active-Active deployment.

It will be great if you can reply on above queries please.

--

Regards,
Saurabh Agrawal

Software Development Specialist, IPaaS R
[A picture containing logoDescription automatically generated]


This email and the information contained herein is proprietary and confidential 
and subject to the Amdocs Email Terms of Service, which you may review at 
https://www.amdocs.com/about/email-terms-of-service 



Spark streaming sourceArchiveDir does not move file to archive directory

2023-09-19 Thread Yunus Emre G?rses
Hello everyone,

I'm using scala and spark with the version 3.4.1 in Windows 10. While streaming 
using Spark, I give the `cleanSource` option as "archive" and the 
`sourceArchiveDir` option as "archived" as in the code below.

```
spark.readStream
  .option("cleanSource", "archive")
  .option("sourceArchiveDir", "archived")
  .option("enforceSchema", false)
  .option("header", includeHeader)
  .option("inferSchema", inferSchema)
  .options(otherOptions)
  .schema(csvSchema.orNull)
  .csv(FileUtils.getPath(sourceSettings.dataFolderPath, 
mappingSource.path).toString)
```

The code ```FileUtils.getPath(sourceSettings.dataFolderPath, 
mappingSource.path)``` returns a relative path like: 
test-data\streaming-folder\patients

When I start stream, spark does not move source csv to archive folder. After 
working on it a bit, I started debugging the spark source codes. I found the 
```override protected def cleanTask(entry: FileEntry): Unit``` method in the 
`FileStreamSource.scala` file in the `org.apache.spark.sql.execution.streaming` 
package.
On line 569, the ```!fileSystem.rename(curPath, newPath)``` code supposed to 
move source file to archive folder. However, when I debugged, I noticed that 
the curPath and newPath values were as follows:

**curPath**: 
`file:/C:/dev/be/data-integration-suite/test-data/streaming-folder/patients/patients-success.csv`

**newPath**: 
`file:/C:/dev/be/data-integration-suite/archived/C:/dev/be/data-integration-suite/test-data/streaming-folder/patients/patients-success.csv`

It seems that absolute path of csv file were appended when creating `newPath` 
because there are two `C:/dev/be/data-integration-suite` in the newPath. This 
is the reason spark archiving does not work. Instead, newPath should be: 
`file:/C:/dev/be/data-integration-suite/archived/test-data/streaming-folder/patients/patients-success.csv`.
 I guess this is more related to spark library and maybe it's a spark related 
bug? Is there any workaround or spark config to overcome this problem?

Thanks
Best regards,
Yunus Emre


Re: [Spark streaming]: Microbatch id in logs

2023-06-26 Thread Mich Talebzadeh
In SSS
writeStream. \
   outputMode('append'). \
   option("truncate", "false"). \
  * foreachBatch(SendToBigQuery). \*
   option('checkpointLocation', checkpoint_path). \

so this writeStream will call  foreachBatch()

   """
   "foreachBatch" performs custom write logic on each
micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:*
micro-batch as DataFrame or Dataset and second: unique id for each batch*
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table

that does this

def SendToBigQuery(df, batchId):
if(len(df.take(1))) > 0:
print(batchId)
# do your logic
else:
print("DataFrame is empty")

You should also have it in

   option('checkpointLocation', checkpoint_path).

See this article on mine
Processing Change Data Capture with Spark Structured Streaming


HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 06:01, Anil Dasari  wrote:

> Hi,
> I am using spark 3.3.1 distribution and spark stream in my application. Is
> there a way to add a microbatch id to all logs generated by spark and spark
> applications ?
>
> Thanks.
>


[Spark streaming]: Microbatch id in logs

2023-06-25 Thread Anil Dasari
Hi,
I am using spark 3.3.1 distribution and spark stream in my application. Is
there a way to add a microbatch id to all logs generated by spark and spark
applications ?

Thanks.


Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Mich Talebzadeh
Hi Lingzhe Sun,

Thanks for your comments. I am afraid I won't be able to take part in this
project and contribute.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 12 Apr 2023 at 02:55, Lingzhe Sun  wrote:

> Hi Mich,
>
> FYI we're using spark operator(
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
> stateful structured streaming on k8s for a year. Haven't test it using
> non-operator way.
>
> Besides that, the main contributor of the spark operator, Yinan Li, has
> been inactive for quite long time. Kind of worried that this project might
> finally become outdated as k8s is evolving. So if anyone is interested,
> please support the project.
>
> --
> Lingzhe Sun
> Hirain Technologies
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-04-11 02:06
> *To:* Rajesh Katkar 
> *CC:* user 
> *Subject:* Re: spark streaming and kinesis integration
> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>> Officially I could not find the connector or reader for kinesis from
>>>> spark like it has for kafka.
>>>>
>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>
>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Rajesh,
>>>>>
>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>> Which use case it concerns
>>>>>
>>>>> https://aws.amazon.com/kinesis/
>>>>>
>>>>> Can you use something else instead?
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Archi

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread 孙令哲
Hi Rajesh,


It's working fine, at least for now. But you'll need to build your own spark 
image using later versions.


Lingzhe Sun
Hirain Technologies

 







Original:
From:Rajesh Katkar Date:2023-04-12 21:36:52To:Lingzhe 
SunCc:Mich Talebzadeh  , 
user Subject:Re: Re: spark streaming and 
kinesis integrationHi Lingzhe,

We are also started using this operator.
Do you see any issues with it? 




On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:

Hi Mich,


FYI we're using spark 
operator(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build 
stateful structured streaming on k8s for a year. Haven't test it using 
non-operator way.


Besides that, the main contributor of the spark operator, Yinan Li, has been 
inactive for quite long time. Kind of worried that this project might finally 
become outdated as k8s is evolving. So if anyone is interested, please support 
the project.


Lingzhe Sun
Hirain Technologies

 
From: Mich Talebzadeh
Date: 2023-04-11 02:06
To: Rajesh Katkar
CC: user
Subject: Re: spark streaming and kinesis integration


What I said was this"In so far as I know k8s does not support spark structured 
streaming?"


So it is an open question. I just recalled it. I have not tested myself. I know 
structured streaming works on Google Dataproc cluster but I have not seen any 
official link that says Spark Structured Streaming is supported on k8s.


HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:

Do you have any link or ticket which justifies that k8s does not support spark 
streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh,  wrote:

Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:

Use case is , we want to read/write to kinesis streams using k8sOfficially I 
could not find the connector or reader for kinesis from spark like it has for 
kafka.


Checking here if anyone used kinesis and spark streaming combination ?


On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh,  wrote:

Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns


https://aws.amazon.com/kinesis/



Can you use something else instead?


HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom




   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 










On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:

Hi Spark Team,
We need to read/write the kinesis streams using spark streaming.
 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql which is not active now.  This is now 
handed over here - https://github.com/roncemer/spark-sql-kinesis
Also according to SPARK-18165 , Spark officially do not have any kinesis 
connector 
We have few below questions , It would be great if you can answer 
Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html This 
documentation does not mention how to write to kinesis. This method has default 
dynamodb as checkpoint, can we override it ?We have rocksdb as a state store 
but when we ran an application using official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html rocksdb 
configura

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Yi Huang
unsubscribe

On Wed, Apr 12, 2023 at 3:59 PM Rajesh Katkar 
wrote:

> Hi Lingzhe,
>
> We are also started using this operator.
> Do you see any issues with it?
>
>
> On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:
>
>> Hi Mich,
>>
>> FYI we're using spark operator(
>> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
>> stateful structured streaming on k8s for a year. Haven't test it using
>> non-operator way.
>>
>> Besides that, the main contributor of the spark operator, Yinan Li, has
>> been inactive for quite long time. Kind of worried that this project might
>> finally become outdated as k8s is evolving. So if anyone is interested,
>> please support the project.
>>
>> --
>> Lingzhe Sun
>> Hirain Technologies
>>
>>
>> *From:* Mich Talebzadeh 
>> *Date:* 2023-04-11 02:06
>> *To:* Rajesh Katkar 
>> *CC:* user 
>> *Subject:* Re: spark streaming and kinesis integration
>> What I said was this
>> "In so far as I know k8s does not support spark structured streaming?"
>>
>> So it is an open question. I just recalled it. I have not tested myself.
>> I know structured streaming works on Google Dataproc cluster but I have not
>> seen any official link that says Spark Structured Streaming is supported on
>> k8s.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
>> wrote:
>>
>>> Do you have any link or ticket which justifies that k8s does not support
>>> spark streaming ?
>>>
>>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>>> wrote:
>>>
>>>> Do you have a high level diagram of the proposed solution?
>>>>
>>>> In so far as I know k8s does not support spark structured streaming?
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>>> wrote:
>>>>
>>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>>> Officially I could not find the connector or reader for kinesis from
>>>>> spark like it has for kafka.
>>>>>
>>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>>
>>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Hi Rajesh,
>>>>>>
>>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>>> Which use case it concerns
>>>>>>
>>>>>> https://aws.amazon.com/kinesis/
>>>>>>
>>>>>> Can you use something else instead?
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Tec

Re: Re: spark streaming and kinesis integration

2023-04-12 Thread Rajesh Katkar
Hi Lingzhe,

We are also started using this operator.
Do you see any issues with it?


On Wed, 12 Apr, 2023, 7:25 am Lingzhe Sun,  wrote:

> Hi Mich,
>
> FYI we're using spark operator(
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build
> stateful structured streaming on k8s for a year. Haven't test it using
> non-operator way.
>
> Besides that, the main contributor of the spark operator, Yinan Li, has
> been inactive for quite long time. Kind of worried that this project might
> finally become outdated as k8s is evolving. So if anyone is interested,
> please support the project.
>
> --
> Lingzhe Sun
> Hirain Technologies
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-04-11 02:06
> *To:* Rajesh Katkar 
> *CC:* user 
> *Subject:* Re: spark streaming and kinesis integration
> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>> Officially I could not find the connector or reader for kinesis from
>>>> spark like it has for kafka.
>>>>
>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>
>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Rajesh,
>>>>>
>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>> Which use case it concerns
>>>>>
>>>>> https://aws.amazon.com/kinesis/
>>>>>
>>>>> Can you use something else instead?
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or an

Re: Re: spark streaming and kinesis integration

2023-04-11 Thread Lingzhe Sun
Hi Mich,

FYI we're using spark 
operator(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) to build 
stateful structured streaming on k8s for a year. Haven't test it using 
non-operator way.

Besides that, the main contributor of the spark operator, Yinan Li, has been 
inactive for quite long time. Kind of worried that this project might finally 
become outdated as k8s is evolving. So if anyone is interested, please support 
the project.



Lingzhe Sun
Hirain Technologies
 
From: Mich Talebzadeh
Date: 2023-04-11 02:06
To: Rajesh Katkar
CC: user
Subject: Re: spark streaming and kinesis integration
What I said was this
"In so far as I know k8s does not support spark structured streaming?"

So it is an open question. I just recalled it. I have not tested myself. I know 
structured streaming works on Google Dataproc cluster but I have not seen any 
official link that says Spark Structured Streaming is supported on k8s.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:
Do you have any link or ticket which justifies that k8s does not support spark 
streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh,  wrote:
Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark like 
it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh,  wrote:
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns

https://aws.amazon.com/kinesis/

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:
Hi Spark Team,
We need to read/write the kinesis streams using spark streaming.
 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql which is not active now.  This is now 
handed over here - https://github.com/roncemer/spark-sql-kinesis
Also according to SPARK-18165 , Spark officially do not have any kinesis 
connector 
We have few below questions , It would be great if you can answer 
Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html This 
documentation does not mention how to write to kinesis. This method has default 
dynamodb as checkpoint, can we override it ?
We have rocksdb as a state store but when we ran an application using official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html rocksdb 
configurations were not effective. Can you please confirm if rocksdb is not 
applicable in these cases?
rocksdb however works with qubole connector , do you have any plan to release 
kinesis connector?
Please help/recommend us for any good stable kinesis connector or some pointers 
around it


Re: spark streaming and kinesis integration

2023-04-10 Thread Mich Talebzadeh
Just to clarify, a major benefit of k8s in this case is to host your Spark
applications in the form of containers in an automated fashion so that one
can easily deploy as many instances of the application as required
(autoscaling). From below:

https://price2meet.com/gcp/docs/dataproc_docs_concepts_configuring-clusters_autoscaling.pdf

Autoscaling does not support Spark Structured Streaming (
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
(see Autoscaling and Spark Structured Streaming
(#autoscaling_and_spark_structured_streaming)) .

On the same token k8s is more suitable (as of now)  for batch jobs than
Spark Structured Streaming.
https://issues.apache.org/jira/browse/SPARK-12133

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 10 Apr 2023 at 19:06, Mich Talebzadeh 
wrote:

> What I said was this
> "In so far as I know k8s does not support spark structured streaming?"
>
> So it is an open question. I just recalled it. I have not tested myself. I
> know structured streaming works on Google Dataproc cluster but I have not
> seen any official link that says Spark Structured Streaming is supported on
> k8s.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar 
> wrote:
>
>> Do you have any link or ticket which justifies that k8s does not support
>> spark streaming ?
>>
>> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Do you have a high level diagram of the proposed solution?
>>>
>>> In so far as I know k8s does not support spark structured streaming?
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>>> wrote:
>>>
>>>> Use case is , we want to read/write to kinesis streams using k8s
>>>> Officially I could not find the connector or reader for kinesis from
>>>> spark like it has for kafka.
>>>>
>>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>>
>>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Rajesh,
>>>>>
>>>>> What is the use case for Kinesis here? I have not used it personally,
>>>>> Which use case it concerns
>>>>>
>>>>> https://aws.amazon.com/kinesis/
>>>>>
>>>>> Can you use something else instead?
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>

Re: spark streaming and kinesis integration

2023-04-10 Thread Mich Talebzadeh
What I said was this
"In so far as I know k8s does not support spark structured streaming?"

So it is an open question. I just recalled it. I have not tested myself. I
know structured streaming works on Google Dataproc cluster but I have not
seen any official link that says Spark Structured Streaming is supported on
k8s.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 10 Apr 2023 at 06:31, Rajesh Katkar  wrote:

> Do you have any link or ticket which justifies that k8s does not support
> spark streaming ?
>
> On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
> wrote:
>
>> Do you have a high level diagram of the proposed solution?
>>
>> In so far as I know k8s does not support spark structured streaming?
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
>> wrote:
>>
>>> Use case is , we want to read/write to kinesis streams using k8s
>>> Officially I could not find the connector or reader for kinesis from
>>> spark like it has for kafka.
>>>
>>> Checking here if anyone used kinesis and spark streaming combination ?
>>>
>>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
>>> wrote:
>>>
>>>> Hi Rajesh,
>>>>
>>>> What is the use case for Kinesis here? I have not used it personally,
>>>> Which use case it concerns
>>>>
>>>> https://aws.amazon.com/kinesis/
>>>>
>>>> Can you use something else instead?
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
>>>> wrote:
>>>>
>>>>> Hi Spark Team,
>>>>>
>>>>> We need to read/write the kinesis streams using spark streaming.
>>>>>
>>>>>  We checked the official documentation -
>>>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>>
>>>>> It does not mention kinesis connector. Alternative is -
>>>>> https://github.com/qubole/kinesis-sql which is not active now.  This
>>>>> is now handed over here -
>>>>> https://github.com/roncemer/spark-sql-kinesis
>>>>>
>>>>> Also according to SPARK-18165
>>>>> <https://issues.apache.org/jira/browse/SPARK-18165> , Spark
>>>>> officially do not have any kinesis connector
>>>>>
>>>>> We have few below questions , It would be great if you can answer
>>>>>
>>>>>1. Does Spark provides officially any kinesis connector which have
>>>>>readstream/writestream and endorse any connector for production use 
>>>>> cases ?
>>>>>
>>>>>2.
>>>>>
>>>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>>>> This
>>>>>documentation does not mention how to write to kinesis. This method has
>>>>>default dynamodb as checkpoint, can we override it ?
>>>>>3. We have rocksdb as a state store but when we ran an application
>>>>>using official
>>>>>
>>>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>>>> rocksdb
>>>>>configurations were not effective. Can you please confirm if rocksdb 
>>>>> is not
>>>>>applicable in these cases?
>>>>>4. rocksdb however works with qubole connector , do you have any
>>>>>plan to release kinesis connector?
>>>>>5. Please help/recommend us for any good stable kinesis connector
>>>>>or some pointers around it
>>>>>
>>>>>


Re: spark streaming and kinesis integration

2023-04-10 Thread Rajesh Katkar
Do you have any link or ticket which justifies that k8s does not support
spark streaming ?

On Thu, 6 Apr, 2023, 9:15 pm Mich Talebzadeh, 
wrote:

> Do you have a high level diagram of the proposed solution?
>
> In so far as I know k8s does not support spark structured streaming?
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
> wrote:
>
>> Use case is , we want to read/write to kinesis streams using k8s
>> Officially I could not find the connector or reader for kinesis from
>> spark like it has for kafka.
>>
>> Checking here if anyone used kinesis and spark streaming combination ?
>>
>> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Hi Rajesh,
>>>
>>> What is the use case for Kinesis here? I have not used it personally,
>>> Which use case it concerns
>>>
>>> https://aws.amazon.com/kinesis/
>>>
>>> Can you use something else instead?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
>>> wrote:
>>>
>>>> Hi Spark Team,
>>>>
>>>> We need to read/write the kinesis streams using spark streaming.
>>>>
>>>>  We checked the official documentation -
>>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>
>>>> It does not mention kinesis connector. Alternative is -
>>>> https://github.com/qubole/kinesis-sql which is not active now.  This
>>>> is now handed over here - https://github.com/roncemer/spark-sql-kinesis
>>>>
>>>> Also according to SPARK-18165
>>>> <https://issues.apache.org/jira/browse/SPARK-18165> , Spark officially
>>>> do not have any kinesis connector
>>>>
>>>> We have few below questions , It would be great if you can answer
>>>>
>>>>1. Does Spark provides officially any kinesis connector which have
>>>>readstream/writestream and endorse any connector for production use 
>>>> cases ?
>>>>
>>>>2.
>>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>>> This
>>>>documentation does not mention how to write to kinesis. This method has
>>>>default dynamodb as checkpoint, can we override it ?
>>>>3. We have rocksdb as a state store but when we ran an application
>>>>using official
>>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>>> rocksdb
>>>>configurations were not effective. Can you please confirm if rocksdb is 
>>>> not
>>>>applicable in these cases?
>>>>4. rocksdb however works with qubole connector , do you have any
>>>>plan to release kinesis connector?
>>>>5. Please help/recommend us for any good stable kinesis connector
>>>>or some pointers around it
>>>>
>>>>


Re: spark streaming and kinesis integration

2023-04-06 Thread Rajesh Katkar
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark
like it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
wrote:

> Hi Rajesh,
>
> What is the use case for Kinesis here? I have not used it personally,
> Which use case it concerns
>
> https://aws.amazon.com/kinesis/
>
> Can you use something else instead?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
> wrote:
>
>> Hi Spark Team,
>>
>> We need to read/write the kinesis streams using spark streaming.
>>
>>  We checked the official documentation -
>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>
>> It does not mention kinesis connector. Alternative is -
>> https://github.com/qubole/kinesis-sql which is not active now.  This is
>> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>>
>> Also according to SPARK-18165
>> <https://issues.apache.org/jira/browse/SPARK-18165> , Spark officially
>> do not have any kinesis connector
>>
>> We have few below questions , It would be great if you can answer
>>
>>1. Does Spark provides officially any kinesis connector which have
>>readstream/writestream and endorse any connector for production use cases 
>> ?
>>
>>2.
>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>> This
>>documentation does not mention how to write to kinesis. This method has
>>default dynamodb as checkpoint, can we override it ?
>>3. We have rocksdb as a state store but when we ran an application
>>using official
>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>> rocksdb
>>configurations were not effective. Can you please confirm if rocksdb is 
>> not
>>applicable in these cases?
>>4. rocksdb however works with qubole connector , do you have any plan
>>to release kinesis connector?
>>5. Please help/recommend us for any good stable kinesis connector or
>>some pointers around it
>>
>>


RE: spark streaming and kinesis integration

2023-04-06 Thread Jonske, Kurt
unsubscribe

Regards,
Kurt Jonske
Senior Director
Alvarez & Marsal
Direct:  212 328 8532
Mobile:  312 560 5040
Email:  kjon...@alvarezandmarsal.com<mailto:kjon...@alvarezandmarsal.com>
www.alvarezandmarsal.com

From: Mich Talebzadeh 
Sent: Thursday, April 06, 2023 11:45 AM
To: Rajesh Katkar 
Cc: u...@spark.incubator.apache.org
Subject: Re: spark streaming and kinesis integration


⚠ [EXTERNAL EMAIL]: Use Caution

Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://protect-us.mimecast.com/s/geRNCR61G4svBlOwGI9l42n?domain=linkedin.com/>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://protect-us.mimecast.com/s/IvkpCVOQM8Tx9KZV2szZ50n?domain=en.everybodywiki.com>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar 
mailto:katkar.raj...@gmail.com>> wrote:
Use case is , we want to read/write to kinesis streams using k8s
Officially I could not find the connector or reader for kinesis from spark like 
it has for kafka.

Checking here if anyone used kinesis and spark streaming combination ?

On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
mailto:mich.talebza...@gmail.com>> wrote:
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which use 
case it concerns

https://aws.amazon.com/kinesis/<https://protect-us.mimecast.com/s/EbXfCW6qNgs5GY416iKUuW5?domain=aws.amazon.com/>

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://protect-us.mimecast.com/s/geRNCR61G4svBlOwGI9l42n?domain=linkedin.com/>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://protect-us.mimecast.com/s/IvkpCVOQM8Tx9KZV2szZ50n?domain=en.everybodywiki.com>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
mailto:katkar.raj...@gmail.com>> wrote:

Hi Spark Team,

We need to read/write the kinesis streams using spark streaming.

 We checked the official documentation - 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>

It does not mention kinesis connector. Alternative is - 
https://github.com/qubole/kinesis-sql<https://protect-us.mimecast.com/s/wqnCCYE5PksLOZ9KDiMx-Ed?domain=github.com>
 which is not active now.  This is now handed over here - 
https://github.com/roncemer/spark-sql-kinesis<https://protect-us.mimecast.com/s/D3qVCZ60Qls52Rj17iP85Ej?domain=github.com>

Also according to 
SPARK-18165<https://protect-us.mimecast.com/s/s6R_C1w4AmIM5mZr6CyDJHr?domain=issues.apache.org>
 , Spark officially do not have any kinesis connector

We have few below questions , It would be great if you can answer

  1.  Does Spark provides officially any kinesis connector which have 
readstream/writestream and endorse any connector for production use cases ?
  2.  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>
 This documentation does not mention how to write to kinesis. This method has 
default dynamodb as checkpoint, can we override it ?
  3.  We have rocksdb as a state store but when we ran an application using 
official  
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html<https://protect-us.mimecast.com/s/pmRCCXD5OjTX0N9l4iksfyX?domain=spark.apache.org>
 rocksdb configurations were not effective. Can you please confirm if rocksdb 
is not applicable in these cases?
  4.  rocksdb however works with qubole connector , do you have any plan to 
release kinesis connector?
  5.  Please help/recommend us for any good stable kinesis connector or some 
pointers around it


Re: spark streaming and kinesis integration

2023-04-06 Thread Mich Talebzadeh
Do you have a high level diagram of the proposed solution?

In so far as I know k8s does not support spark structured streaming?

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Apr 2023 at 16:40, Rajesh Katkar  wrote:

> Use case is , we want to read/write to kinesis streams using k8s
> Officially I could not find the connector or reader for kinesis from spark
> like it has for kafka.
>
> Checking here if anyone used kinesis and spark streaming combination ?
>
> On Thu, 6 Apr, 2023, 7:23 pm Mich Talebzadeh, 
> wrote:
>
>> Hi Rajesh,
>>
>> What is the use case for Kinesis here? I have not used it personally,
>> Which use case it concerns
>>
>> https://aws.amazon.com/kinesis/
>>
>> Can you use something else instead?
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar 
>> wrote:
>>
>>> Hi Spark Team,
>>>
>>> We need to read/write the kinesis streams using spark streaming.
>>>
>>>  We checked the official documentation -
>>> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>
>>> It does not mention kinesis connector. Alternative is -
>>> https://github.com/qubole/kinesis-sql which is not active now.  This is
>>> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>>>
>>> Also according to SPARK-18165
>>> <https://issues.apache.org/jira/browse/SPARK-18165> , Spark officially
>>> do not have any kinesis connector
>>>
>>> We have few below questions , It would be great if you can answer
>>>
>>>1. Does Spark provides officially any kinesis connector which have
>>>readstream/writestream and endorse any connector for production use 
>>> cases ?
>>>
>>>2.
>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>> This
>>>documentation does not mention how to write to kinesis. This method has
>>>default dynamodb as checkpoint, can we override it ?
>>>3. We have rocksdb as a state store but when we ran an application
>>>using official
>>>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
>>> rocksdb
>>>configurations were not effective. Can you please confirm if rocksdb is 
>>> not
>>>applicable in these cases?
>>>4. rocksdb however works with qubole connector , do you have any
>>>plan to release kinesis connector?
>>>5. Please help/recommend us for any good stable kinesis connector or
>>>some pointers around it
>>>
>>>


Re: spark streaming and kinesis integration

2023-04-06 Thread Mich Talebzadeh
Hi Rajesh,

What is the use case for Kinesis here? I have not used it personally, Which
use case it concerns

https://aws.amazon.com/kinesis/

Can you use something else instead?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Apr 2023 at 13:08, Rajesh Katkar  wrote:

> Hi Spark Team,
>
> We need to read/write the kinesis streams using spark streaming.
>
>  We checked the official documentation -
> https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>
> It does not mention kinesis connector. Alternative is -
> https://github.com/qubole/kinesis-sql which is not active now.  This is
> now handed over here - https://github.com/roncemer/spark-sql-kinesis
>
> Also according to SPARK-18165
> <https://issues.apache.org/jira/browse/SPARK-18165> , Spark officially do
> not have any kinesis connector
>
> We have few below questions , It would be great if you can answer
>
>1. Does Spark provides officially any kinesis connector which have
>readstream/writestream and endorse any connector for production use cases ?
>
>2.
>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> This
>documentation does not mention how to write to kinesis. This method has
>default dynamodb as checkpoint, can we override it ?
>3. We have rocksdb as a state store but when we ran an application
>using official
>https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
> rocksdb
>configurations were not effective. Can you please confirm if rocksdb is not
>applicable in these cases?
>4. rocksdb however works with qubole connector , do you have any plan
>to release kinesis connector?
>5. Please help/recommend us for any good stable kinesis connector or
>some pointers around it
>
>


spark streaming and kinesis integration

2023-04-06 Thread Rajesh Katkar
Hi Spark Team,

We need to read/write the kinesis streams using spark streaming.

 We checked the official documentation -
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html

It does not mention kinesis connector. Alternative is -
https://github.com/qubole/kinesis-sql which is not active now.  This is now
handed over here - https://github.com/roncemer/spark-sql-kinesis

Also according to SPARK-18165
<https://issues.apache.org/jira/browse/SPARK-18165> , Spark officially do
not have any kinesis connector

We have few below questions , It would be great if you can answer

   1. Does Spark provides officially any kinesis connector which have
   readstream/writestream and endorse any connector for production use cases ?

   2.
   https://spark.apache.org/docs/latest/streaming-kinesis-integration.html This
   documentation does not mention how to write to kinesis. This method has
   default dynamodb as checkpoint, can we override it ?
   3. We have rocksdb as a state store but when we ran an application using
   official
   https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
rocksdb
   configurations were not effective. Can you please confirm if rocksdb is not
   applicable in these cases?
   4. rocksdb however works with qubole connector , do you have any plan to
   release kinesis connector?
   5. Please help/recommend us for any good stable kinesis connector or
   some pointers around it


Re: Re: should one every make a spark streaming job in pyspark

2022-11-03 Thread Lingzhe Sun
In addition to that:

For now some stateful operations in structured streaming don't have equivalent 
python API, e.g. flatMapGroupsWithState. However spark engineers are making it 
possible in the upcoming version. See more: 
https://www.databricks.com/blog/2022/10/18/python-arbitrary-stateful-processing-structured-streaming.html



Best Regards!
...
Lingzhe Sun 
Hirain Technology / APIC
 
From: Mich Talebzadeh
Date: 2022-11-03 19:15
To: Joris Billen
CC: User
Subject: Re: should one every make a spark streaming job in pyspark
Well your mileage varies so to speak.

Spark itself is written in Scala. However, that does not imply you should stick 
with Scala.
I have used both for spark streaming and spark structured streaming, they both 
work fine
PySpark has become popular with the widespread use of iData Science projects
What matters normally is the skill set you already have in-house. The 
likelihood is that there are more Python developers than Scala developers and 
the learning curve for scala has to be taken into account
The idea of performance etc is tangential.
 With regard to the Spark code itself, there should be little efforts in 
converting from Scala to PySpark or vice-versa
HTH

   view my Linkedin profile

  
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Wed, 2 Nov 2022 at 08:54, Joris Billen  wrote:
Dear community, 
I had a general question about the use of scala VS pyspark for spark streaming.
I believe spark streaming will work most efficiently when written in scala. I 
believe however that things can be implemented in pyspark. My question: 
1)is it completely dumb to make a streaming job in pyspark? 
2)what are the technical reasons that it is done best in scala (is this easy to 
understand why)? 
3)any good links anyone has seen with numbers of the difference in performance 
and under what circumstances+explanation?
4)are there certain scenarios when the use of pyspark can be motivated (maybe 
when someone doesn’t feel confortable writing a job in scala and the number of 
messages/minute aren’t gigantic so performance isnt that crucial?)

Thanks for any input!
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: should one every make a spark streaming job in pyspark

2022-11-03 Thread Mich Talebzadeh
Well your mileage varies so to speak.


   - Spark itself is written in Scala. However, that does not imply you
   should stick with Scala.
   - I have used both for spark streaming and spark structured streaming,
   they both work fine
   - PySpark has become popular with the widespread use of iData Science
   projects
   - What matters normally is the skill set you already have in-house. The
   likelihood is that there are more Python developers than Scala developers
   and the learning curve for scala has to be taken into account
   - The idea of performance etc is tangential.
   -  With regard to the Spark code itself, there should be little efforts
   in converting from Scala to PySpark or vice-versa

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 2 Nov 2022 at 08:54, Joris Billen 
wrote:

> Dear community,
> I had a general question about the use of scala VS pyspark for spark
> streaming.
> I believe spark streaming will work most efficiently when written in
> scala. I believe however that things can be implemented in pyspark. My
> question:
> 1)is it completely dumb to make a streaming job in pyspark?
> 2)what are the technical reasons that it is done best in scala (is this
> easy to understand why)?
> 3)any good links anyone has seen with numbers of the difference in
> performance and under what circumstances+explanation?
> 4)are there certain scenarios when the use of pyspark can be motivated
> (maybe when someone doesn’t feel confortable writing a job in scala and the
> number of messages/minute aren’t gigantic so performance isnt that crucial?)
>
> Thanks for any input!
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


should one every make a spark streaming job in pyspark

2022-11-02 Thread Joris Billen
Dear community, 
I had a general question about the use of scala VS pyspark for spark streaming.
I believe spark streaming will work most efficiently when written in scala. I 
believe however that things can be implemented in pyspark. My question: 
1)is it completely dumb to make a streaming job in pyspark? 
2)what are the technical reasons that it is done best in scala (is this easy to 
understand why)? 
3)any good links anyone has seen with numbers of the difference in performance 
and under what circumstances+explanation?
4)are there certain scenarios when the use of pyspark can be motivated (maybe 
when someone doesn’t feel confortable writing a job in scala and the number of 
messages/minute aren’t gigantic so performance isnt that crucial?)

Thanks for any input!
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark, Spark Streaming] Bug in timestamp handling in Structured Streaming?

2022-10-21 Thread kai-michael.roes...@sap.com.INVALID
Hi,

I suspect I may have come across a bug in the handling of data containing 
timestamps in PySpark "Structured Streaming" using the foreach option. I'm 
"just" a user of PySpark, no Spark community member, so I don't know how to 
properly address the issue. I have posted a 
question
 about this on StackOverflow but that didn't get any attention, yet. Could 
someone please have a look at it to check whether it is really a bug? In case a 
Jira ticket is created could you please send me the link?
Thanks and best regards
Kai Roesner.
Dr. Kai-Michael Roesner
Development Architect
Technology & Innovation, Common Data Services
SAP SE
Robert-Bosch-Strasse 30/34
69190 Walldorf, Germany
T +49 6227 7-64216
F +49 6227 78-28459
E kai-michael.roes...@sap.com
www.sap.com

Please consider the impact on the environment before printing this e-mail.

Pflichtangaben/Mandatory Disclosure Statements:
www.sap.com/corporate-en/impressum

Diese E-Mail kann Betriebs- oder Geschäftsgeheimnisse oder sonstige 
vertrauliche Informationen enthalten. Sollten Sie diese E-Mail irrtümlich 
erhalten haben, ist Ihnen eine Kenntnisnahme des Inhalts, eine Vervielfältigung 
oder Weitergabe der E-Mail ausdrücklich untersagt. Bitte benachrichtigen Sie 
uns und vernichten Sie die empfangene E-Mail. Vielen Dank.

This e-mail may contain trade secrets or privileged, undisclosed, or otherwise 
confidential information. If you have received this e-mail in error, you are 
hereby notified that any review, copying, or distribution of it is strictly 
prohibited. Please inform us immediately and destroy the original transmittal. 
Thank you for your cooperation.



Re: Updating Broadcast Variable in Spark Streaming 2.4.4

2022-09-28 Thread Sean Owen
I don't think that can work. Your BroadcastUpdater is copied to the task,
with a reference to an initial broadcast. When that is later updated on the
driver, this does not affect the broadcast inside the copy in the tasks.

On Wed, Sep 28, 2022 at 10:11 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. What
> I can see after adding some log messages is that the BroadcastUpdater
> thread is only called twice and then never again. Anyone any idea why
> this happens?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>  private final transient JavaSparkContext sparkContext;
>  @Getter
>  private transient volatile Broadcast>
> broadcastVar;
>  private transient volatile Map configMap;
>
>  public void run() {
>  Map configMap = getConfigMap();
>  if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>  this.configMap = configMap;
>  if (broadcastVar != null) {
>  broadcastVar.unpersist(true);
>  broadcastVar.destroy(true);
>  }
>  this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>  }
>  }
>
>  private Map getConfigMap() {
>  //impl details
>  }
> }
>
> public class StreamingFunction implements Serializable {
>
>  private transient volatile BroadcastUpdater broadcastUpdater;
>  private transient ScheduledThreadPoolExecutor
> scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
>
>  protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy consumerStrategy) {
>  broadcastUpdater = new BroadcastUpdater(context.sparkContext());
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>  final JavaInputDStream ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>  inputStream.foreachRDD(rdd -> {
>  Broadcast> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>  rdd.foreachPartition(partition -> {
>  if (partition.hasNext()) {
>  Map configMap =
> broadcastVar.getValue();
>
>  // iterate
>  while (partition.hasNext()) {
>  //impl logic using broadcast variable
>  }
>  }
>  }
>  }
>  }
> }
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Updating Broadcast Variable in Spark Streaming 2.4.4

2022-09-28 Thread Dipl.-Inf. Rico Bergmann

Hi folks!


I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. What 
I can see after adding some log messages is that the BroadcastUpdater 
thread is only called twice and then never again. Anyone any idea why 
this happens?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient volatile Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;
    private transient ScheduledThreadPoolExecutor 
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);


    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Error - Spark STREAMING

2022-09-21 Thread Anupam Singh
Which version of spark are you using?

On Tue, Sep 20, 2022, 1:57 PM Akash Vellukai 
wrote:

> Hello,
>
>
>   py4j.protocol.Py4JJavaError: An error occurred while calling o80.load. :
> java.lang.NoClassDefFoundError:
> org/apache/spark/sql/internal/connector/SimpleTableProvider
>
>
> May anyone help Me to solve this issue.
>
>
> Thanks and regards
> Akash
>


Error - Spark STREAMING

2022-09-20 Thread Akash Vellukai
Hello,


  py4j.protocol.Py4JJavaError: An error occurred while calling o80.load. :
java.lang.NoClassDefFoundError:
org/apache/spark/sql/internal/connector/SimpleTableProvider


May anyone help Me to solve this issue.


Thanks and regards
Akash


Re: Spark streaming

2022-08-20 Thread Gourav Sengupta
Hi,
spark is just an unnecessary overengineered overkill for that kind of a
job. I know they are trying to make SPARK a one stop solution for
everything but that is a marketing attempt to capture market share, rather
than the true blue engineering creativity that led to the creation of SPARK
- so please be aware.

Are you in AWS? Please try DMS. If you are then that might be the best
solution depending on what you are looking for ofcourse.

If you are not in AWS, please let me know your environment, and I can help
you out.



Regards,
Gourav Sengupta

On Fri, Aug 19, 2022 at 1:13 PM sandra sukumaran <
sandrasukumara...@gmail.com> wrote:

> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>


Re: [EXTERNAL] Re: Spark streaming

2022-08-20 Thread sandra sukumaran
Thanks, I'll try it out.

On Fri, 19 Aug 2022, 6:12 pm Saurabh Gulati, 
wrote:

> You can also try out
> https://debezium.io/documentation/reference/0.10/connectors/mysql.html
> --
> *From:* Ajit Kumar Amit 
> *Sent:* 19 August 2022 14:30
> *To:* sandra sukumaran 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Spark streaming
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> https://github.com/allwefantasy/spark-binlog
> <https://urldefense.com/v3/__https://github.com/allwefantasy/spark-binlog__;!!BL9GA0TyTA!actkW_0ZhxHTImtHaWtPiApgCD3oISkeTDvhezK35qIcodH-PjmcOxZ9BCpVfZxXdhNi9ciMkltEEY7C5_QtFQ$>
>
> Sent from my iPhone
>
> On 19 Aug 2022, at 5:45 PM, sandra sukumaran 
> wrote:
>
> 
> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>
>


Re: [EXTERNAL] Re: Spark streaming

2022-08-19 Thread Saurabh Gulati
You can also try out 
https://debezium.io/documentation/reference/0.10/connectors/mysql.html

From: Ajit Kumar Amit 
Sent: 19 August 2022 14:30
To: sandra sukumaran 
Cc: user@spark.apache.org 
Subject: [EXTERNAL] Re: Spark streaming

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.

https://github.com/allwefantasy/spark-binlog<https://urldefense.com/v3/__https://github.com/allwefantasy/spark-binlog__;!!BL9GA0TyTA!actkW_0ZhxHTImtHaWtPiApgCD3oISkeTDvhezK35qIcodH-PjmcOxZ9BCpVfZxXdhNi9ciMkltEEY7C5_QtFQ$>

Sent from my iPhone

On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
wrote:


Dear Sir,



 Is there any possible method to fetch MySQL database bin log, with the 
help of spark streaming.
Kafka streaming is not applicable in this case.



Thanks and regards
Sandra


Re: Spark streaming

2022-08-19 Thread Ajit Kumar Amit
https://github.com/allwefantasy/spark-binlog

Sent from my iPhone

> On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
> wrote:
> 
> 
> Dear Sir,
> 
> 
> 
>  Is there any possible method to fetch MySQL database bin log, with the 
> help of spark streaming.
> Kafka streaming is not applicable in this case.
> 
> 
> 
> Thanks and regards
> Sandra


Spark streaming

2022-08-19 Thread sandra sukumaran
Dear Sir,



 Is there any possible method to fetch MySQL database bin log, with the
help of spark streaming.
Kafka streaming is not applicable in this case.



Thanks and regards
Sandra


Re: Spark streaming

2022-08-18 Thread ミユナ (alice)
> Dear sir,
>
>
>I want to check the logs of MySQL database using spark streaming, can
> someone help me with those listening queries.
>
>
> Thanks and regards
> Akash P
>

you can ingest logs by fluent-bit to kafka then setup spark to read
records from kafka by streaming.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming

2022-08-17 Thread Prajith Vellukkai
Dear sir,


   I want to check the logs of MySQL database using spark streaming, can
someone help me with those listening queries.


Thanks and regards
Akash P


Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Akash Vellukai
I am beginner with spark may , also know how to connect MySQL database with
spark streaming

Thanks and regards
Akash P

On Wed, 17 Aug, 2022, 8:28 pm Saurabh Gulati, 
wrote:

> Another take:
>
>- Debezium
><https://debezium.io/documentation/reference/stable/connectors/mysql.html>
>to read Write Ahead logs(WAL) and send to Kafka
>- Kafka connect to write to cloud storage -> Hive
>   - OR
>
>
>- Spark streaming to parse WAL -> Storage -> Hive
>
> Regards
> --
> *From:* Gibson 
> *Sent:* 17 August 2022 16:53
> *To:* Akash Vellukai 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Spark streaming - Data Ingestion
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> If you have space for a message log like, then you should try:
>
> MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS
> -> Hive
>
> On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
> wrote:
>
> Dear sir
>
> I have tried a lot on this could you help me with this?
>
> Data ingestion from MySql to Hive with spark- streaming?
>
> Could you give me an overview.
>
>
> Thanks and regards
> Akash P
>
>


Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Gibson
The idea behind spark-streaming is to process change events as they occur,
hence the suggestions above that require capturing change events using
Debezium.

But you can use jdbc drivers to connect Spark to relational databases


On Wed, Aug 17, 2022 at 6:21 PM Akash Vellukai 
wrote:

> I am beginner with spark may , also know how to connect MySQL database
> with spark streaming
>
> Thanks and regards
> Akash P
>
> On Wed, 17 Aug, 2022, 8:28 pm Saurabh Gulati, 
> wrote:
>
>> Another take:
>>
>>- Debezium
>><https://debezium.io/documentation/reference/stable/connectors/mysql.html>
>>to read Write Ahead logs(WAL) and send to Kafka
>>- Kafka connect to write to cloud storage -> Hive
>>   - OR
>>
>>
>>- Spark streaming to parse WAL -> Storage -> Hive
>>
>> Regards
>> --
>> *From:* Gibson 
>> *Sent:* 17 August 2022 16:53
>> *To:* Akash Vellukai 
>> *Cc:* user@spark.apache.org 
>> *Subject:* [EXTERNAL] Re: Spark streaming - Data Ingestion
>>
>> *Caution! This email originated outside of FedEx. Please do not open
>> attachments or click links from an unknown or suspicious origin*.
>> If you have space for a message log like, then you should try:
>>
>> MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS
>> -> Hive
>>
>> On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai <
>> akashvellukai...@gmail.com> wrote:
>>
>> Dear sir
>>
>> I have tried a lot on this could you help me with this?
>>
>> Data ingestion from MySql to Hive with spark- streaming?
>>
>> Could you give me an overview.
>>
>>
>> Thanks and regards
>> Akash P
>>
>>


Re: [EXTERNAL] Re: Spark streaming - Data Ingestion

2022-08-17 Thread Saurabh Gulati
Another take:

  *   
Debezium<https://debezium.io/documentation/reference/stable/connectors/mysql.html>
 to read Write Ahead logs(WAL) and send to Kafka
  *   Kafka connect to write to cloud storage -> Hive
 *   OR

  *   Spark streaming to parse WAL -> Storage -> Hive

Regards

From: Gibson 
Sent: 17 August 2022 16:53
To: Akash Vellukai 
Cc: user@spark.apache.org 
Subject: [EXTERNAL] Re: Spark streaming - Data Ingestion

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.

If you have space for a message log like, then you should try:

MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS -> Hive

On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
mailto:akashvellukai...@gmail.com>> wrote:
Dear sir

I have tried a lot on this could you help me with this?

Data ingestion from MySql to Hive with spark- streaming?

Could you give me an overview.


Thanks and regards
Akash P


Re: Spark streaming - Data Ingestion

2022-08-17 Thread Gibson
If you have space for a message log like, then you should try:

MySQL -> Kafka (via CDC) -> Spark (Structured Streaming) -> HDFS/S3/ADLS ->
Hive

On Wed, Aug 17, 2022 at 5:40 PM Akash Vellukai 
wrote:

> Dear sir
>
> I have tried a lot on this could you help me with this?
>
> Data ingestion from MySql to Hive with spark- streaming?
>
> Could you give me an overview.
>
>
> Thanks and regards
> Akash P
>


Spark streaming - Data Ingestion

2022-08-17 Thread Akash Vellukai
Dear sir

I have tried a lot on this could you help me with this?

Data ingestion from MySql to Hive with spark- streaming?

Could you give me an overview.


Thanks and regards
Akash P


Re: Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Sean Owen
I think you're taking the right approach, trying to create a new broadcast
var. What part doesn't work? for example I wonder if comparing Map equality
like that does what you think, isn't it just reference equality? debug a
bit more to see whether it even destroys and recreates the broadcast in
your code.

On Fri, Jul 22, 2022 at 4:24 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. Am I
> doing something wrong? Or is this simply not possible? Or a bug?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>  private final transient JavaSparkContext sparkContext;
>  @Getter
>  private transient volatile Broadcast>
> broadcastVar;
>  private transient Map configMap;
>
>  public void run() {
>  Map configMap = getConfigMap();
>  if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>  this.configMap = configMap;
>  if (broadcastVar != null) {
>  broadcastVar.unpersist(true);
>  broadcastVar.destroy(true);
>  }
>  this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>  }
>  }
>
>  private Map getConfigMap() {
>  //impl details
>  }
> }
>
> public class StreamingFunction implements Serializable {
>
>  private transient volatile BroadcastUpdater broadcastUpdater;
>
>  protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy consumerStrategy) {
>  broadcastUpdater = new BroadcastUpdater(context.sparkContext());
>  ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
> ScheduledThreadPoolExecutor(1);
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>  final JavaInputDStream ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>  inputStream.foreachRDD(rdd -> {
>  Broadcast> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>  rdd.foreachPartition(partition -> {
>  if (partition.hasNext()) {
>  Map configMap =
> broadcastVar.getValue();
>
>  // iterate
>  while (partition.hasNext()) {
>  //impl logic using broadcast variable
>  }
>  }
>  }
>  }
>  }
> }
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Dipl.-Inf. Rico Bergmann

Hi folks!

I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. Am I 
doing something wrong? Or is this simply not possible? Or a bug?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;

    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new 
ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming pending mircobatches queue max length

2022-07-13 Thread Anil Dasari
Retry.

From: Anil Dasari 
Date: Tuesday, July 12, 2022 at 3:42 PM
To: user@spark.apache.org 
Subject: Spark streaming pending mircobatches queue max length
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Spark streaming pending mircobatches queue max length

2022-07-12 Thread Anil Dasari
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Spark streaming / confluent Kafka- messages are empty

2022-06-09 Thread KhajaAsmath Mohammed


Hi,

I am trying to read data from confluent Kafka using  avro schema registry. 
Messages are always empty and stream always shows empty records. Any suggestion 
on this please ??

Thanks,
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: protobuf data as input to spark streaming

2022-05-30 Thread Kiran Biswal
Hello Stelios, friendly reminder if you could share any sample code/repo

Are you using a schema registry?

Thanks
Kiran

On Fri, Apr 8, 2022 at 4:37 PM Kiran Biswal  wrote:

> Hello Stelios
>
> Just a gentle follow up if you can share any sample code/repo
>
> Regards
> Kiran
>
> On Wed, Apr 6, 2022 at 3:19 PM Kiran Biswal  wrote:
>
>> Hello Stelios
>>
>> Preferred language would have been Scala or pyspark but if Java is proven
>> I am open to using it
>>
>> Any sample reference or example code link?
>>
>> How are you handling the peotobuf to spark dataframe conversion
>> (serialization federalization)?
>>
>> Thanks
>> Kiran
>>
>> On Wed, Apr 6, 2022, 2:38 PM Stelios Philippou 
>> wrote:
>>
>>> Yes we are currently using it as such.
>>> Code is in java. Will that work?
>>>
>>> On Wed, 6 Apr 2022 at 00:51, Kiran Biswal  wrote:
>>>
 Hello Experts

 Has anyone used protobuf (proto3) encoded data (from kafka) as input
 source and been able to do spark structured streaming?

 I would appreciate if you can share any sample code/example

 Regards
 Kiran

>


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Xavier Gervilla
Thank you for the flatten function, it has a bigger functionality than what I 
need for my project but the examples (which were really, really useful) helped 
me find a solution.



Instead of accessing the confidence and entity attributes (metadata.confidence 
and metadata.entity) I was accessing by metadata.value, which instead of 
returning an error gave null values. In addition, the confidence value (a 
number) has StringType so before calculating the average I had to convert it to 
DoubleType.



The output generated now is the same as with Spark but using only around 8.5GB 
of RAM so there's no longer a memory error!



Thank you again for all your help!



 Activado Wed, 20 Apr 2022 14:24:46 +0200 Bjørn Jørgensen 
 escribió 



Glad to hear that it works :) 



Your dataframe is nested with both map, array and struct. 



I`m using this function to flatten a nested dataframe to rows and columns.  



from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
        .. versionadded:: x.X.X
        
        Parameters
        --
        sep : str
            Delimiter for flatted columns. Default `_`
        
        Notes
        -
        Don`t use `.` as `sep`
        It won't work on nested data frames with more than one level.
        And you will have to use `http://columns.name`. 
        
        Flattening Map Types will have to find every key in the column. 
        This can be slow.
        
        Examples
        

        data_mixed = [
            {
                "state": "Florida",
                "shortname": "FL",
                "info": {"governor": "Rick Scott"},
                "counties": [
                    {"name": "Dade", "population": 12345},
                    {"name": "Broward", "population": 4},
                    {"name": "Palm Beach", "population": 6},
                ],
            },
            {
                "state": "Ohio",
                "shortname": "OH",
                "info": {"governor": "John Kasich"},
                "counties": [
                    {"name": "Summit", "population": 1234},
                    {"name": "Cuyahoga", "population": 1337},
                ],
            },
        ]

        data_mixed = spark.createDataFrame(data=data_mixed)

        data_mixed.printSchema()

        root
        |-- counties: array (nullable = true)
        |    |-- element: map (containsNull = true)
        |    |    |-- key: string
        |    |    |-- value: string (valueContainsNull = true)
        |-- info: map (nullable = true)
        |    |-- key: string
        |    |-- value: string (valueContainsNull = true)
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        
        
        data_mixed_flat = flatten_test(df, sep=":")
        data_mixed_flat.printSchema()
        root
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- counties:name: string (nullable = true)
        |-- counties:population: string (nullable = true)
        |-- info:governor: string (nullable = true)
        

        
        
        data = [
            {
                "id": 1,
                "name": "Cole Volk",
                "fitness": {"height": 130, "weight": 60},
            },
            {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
            {
                "id": 2,
                "name": "Faye Raker",
                "fitness": {"height": 130, "weight": 60},
            },
        ]


        df = spark.createDataFrame(data=data)

        df.printSchema()

        root
        |-- fitness: map (nullable = true)
        |    |-- key: string
        |    |-- value: long (valueContainsNull = true)
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        
        df_flat = flatten_test(df, sep=":")

        df_flat.printSchema()

        root
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        |-- fitness:height: long (nullable = true)
        |-- fitness:weight: long (nullable = true)
        
        data_struct = [
                (("James",None,"Smith"),"OH","M"),
                (("Anna","Rose",""),"NY","F"),
                (("Julia","","Williams"),"OH","F"),
                (("Maria","Anne","Jones"),"NY","M"),
                (("Jen","Mary","Brown"),"NY","M"),
                (("Mike","Mary","Williams"),"OH","M")
                ]

                
        schema = StructType([
            StructField('name', StructType([
                StructField('firstname', StringType(), True),
                StructField('middlename', StringType(), True),
                StructField('lastname', StringType(), True)
                ])),
            StructField('state', StringType(), True),
            StructField('gender', StringType(), True)
            ])

 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Bjørn Jørgensen
Glad to hear that it works :)

Your dataframe is nested with both map, array and struct.

I`m using this function to flatten a nested dataframe to rows and columns.

from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X

Parameters
--
sep : str
Delimiter for flatted columns. Default `_`

Notes
-
Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use `columns.name`.

Flattening Map Types will have to find every key in the column.
This can be slow.

Examples


data_mixed = [
{
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 4},
{"name": "Palm Beach", "population": 6},
],
},
{
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},
],
},
]

data_mixed = spark.createDataFrame(data=data_mixed)

data_mixed.printSchema()

root
|-- counties: array (nullable = true)
||-- element: map (containsNull = true)
|||-- key: string
|||-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
||-- key: string
||-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)


data_mixed_flat = flatten_test(df, sep=":")
data_mixed_flat.printSchema()
root
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)




data = [
{
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
{
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},
},
]


df = spark.createDataFrame(data=data)

df.printSchema()

root
|-- fitness: map (nullable = true)
||-- key: string
||-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)

df_flat = flatten_test(df, sep=":")

df_flat.printSchema()

root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)

data_struct = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]


schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])

df_struct = spark.createDataFrame(data = data_struct, schema =
schema)

df_struct.printSchema()

root
|-- name: struct (nullable = true)
||-- firstname: string (nullable = true)
||-- middlename: string (nullable = true)
||-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)

df_struct_flat = flatten_test(df_struct, sep=":")

df_struct_flat.printSchema()

root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Map Types) in Schema

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet

*change spark = sparknlp.start()*
to
spark = sparknlp.start(spark32=True)


tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen :

> Yes, there are some that have that issue.
>
> Please open a new issue at
> https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you.
>
>
>
>
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
> xavier.gervi...@datapta.com>:
>
>> Thank you for your advice, I had small knowledge of Spark NLP and I
>> thought it was only possible to use with models that required training and
>> therefore my project wasn’t the case. I'm trying now to build the project
>> again with SparkNLP but when I try to load a pretrained model from
>> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
>> occurred while calling
>> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
>> ).
>>
>> This is the new basic code to develop the project again:
>>
>>
>> *spark = sparknlp.start()*
>>
>> *pipelineName = 'analyze_sentiment'*
>>
>>
>> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
>> generates the error*
>>
>> *rawTweets = spark.readStream.format('socket').option('host',
>> 'localhost').option('port',9008).load()*
>>
>> *allTweets = rawTweets.selectExpr('CAST(value AS
>> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>>
>>
>> *sentPred = pipeline.transform(allTweets)*
>>
>> *query =
>> sentPred.writeStream.outputMode('complete').format('console').start()*
>> *query.awaitTermination()*
>>
>> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
>> is 8. I've tried with a different model but the error is still the same, so
>> what could be causing it?
>>
>> If this error is solved I think SparkNLP will be the solution I was
>> looking for to reduce memory consumption so thank you again for suggesting
>> it.
>>
>>
>>
>> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
>> escribió:
>>
>> When did SpaCy have support for Spark?
>>
>> Try Spark NLP  it`s made for spark. They
>> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
>> they public user guides at
>> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>>
>>
>>
>>
>> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>
>> Hi Team,
>> 
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
Yes, there are some that have that issue.

Please open a new issue at https://github.com/JohnSnowLabs/spark-nlp/issues
and they will help you.




tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
xavier.gervi...@datapta.com>:

> Thank you for your advice, I had small knowledge of Spark NLP and I
> thought it was only possible to use with models that required training and
> therefore my project wasn’t the case. I'm trying now to build the project
> again with SparkNLP but when I try to load a pretrained model from
> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
> ).
>
> This is the new basic code to develop the project again:
>
>
> *spark = sparknlp.start()*
>
> *pipelineName = 'analyze_sentiment'*
>
>
> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
> generates the error*
>
> *rawTweets = spark.readStream.format('socket').option('host',
> 'localhost').option('port',9008).load()*
>
> *allTweets = rawTweets.selectExpr('CAST(value AS
> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>
>
> *sentPred = pipeline.transform(allTweets)*
>
> *query =
> sentPred.writeStream.outputMode('complete').format('console').start()*
> *query.awaitTermination()*
>
> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
> is 8. I've tried with a different model but the error is still the same, so
> what could be causing it?
>
> If this error is solved I think SparkNLP will be the solution I was
> looking for to reduce memory consumption so thank you again for suggesting
> it.
>
>
>
> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
> escribió:
>
> When did SpaCy have support for Spark?
>
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>
> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Jungtaek Lim
I have no context on ML, but your "streaming" query exposes the possibility
of memory issues.

*flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
Since this is a streaming query, grouped aggregation incurs state store,
and since you use the output mode as complete, state store will grow over
time which will dominate the memory in executors.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time


On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen 
wrote:

> When did SpaCy have support for Spark?
>
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>> Hi Team,
>>> 
>>>
>>> I'm developing a project that retrieves tweets on a 'host' app, streams
>>> them to Spark and with different operations with DataFrames obtains the
>>> Sentiment of the tweets and their entities applying a Sentiment model and a
>>> NER model respectively.
>>>
>>> The problem I've come across is that when applying the NER model, the
>>> RAM consumption increases until the program stops with a memory error
>>> because there's no memory left to execute. In addition, on SparkUI I've
>>> seen that there's only one executor running, the executor driver, but using
>>> htop on the terminal I see that the 8 cores of the instance are executing
>>> at 100%.
>>>
>>> The SparkSession is only configured to receive the tweets from the
>>> socket that connects with the second program that sends the tweets. The
>>> DataFrame goes through some processing to obtain other properties of the
>>> tweet like its sentiment (which causes no error even with less than 8GB of
>>> RAM) and then the NER is applied.
>>>
>>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>>> **"localhost"**).option(**"port"**,**9008**).load()
>>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>>
>>> **#prior processing of the tweets**
>>> sentDF = other_processing(tweets)
>>>
>>> **#obtaining the column that contains the list of entities from a tweet**
>>> nerDF = ner_classification(sentDF)*
>>>
>>>
>>> This is the code of the functions related to obtaining the NER, the
>>> "main call" and the UDF function.
>>>
>>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>>
>>> **#main call, applies the UDF function to every tweet from the "tweet" 
>>> column**def* *ner_classification**(**words**):
>>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>> **return** words
>>>
>>> **#udf function**def* *obtain_ner_udf**(**words**):
>>> **#if the tweet is empty return None*
>>> *if** words == **""**:
>>> **return* *None*
>>> *#else: applying the NER model (Spacy en_core_web_sm)**
>>> entities = nerModel(words)
>>>
>>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>>> entities.ents ]*
>>>
>>>
>>>
>>> And lastly I map each entity with the sentiment from its tweet and
>>> obtain the average sentiment of the entity and the number of appearances.
>>>
>>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>>> flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
>>> The resulting DF is processed with a function that separates each column
>>> in a list and prints it.
>>>
>>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>>> *df**.**select**(**"entity"**).**collect**()]*
>>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>>> 

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Bjørn Jørgensen
When did SpaCy have support for Spark?

Try Spark NLP  it`s made for spark. They have
a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and they
public user guides at
https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59




man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :

> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>> Hi Team,
>> 
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model (Spacy en_core_web_sm)**
>> entities = nerModel(words)
>>
>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>> entities.ents ]*
>>
>>
>>
>> And lastly I map each entity with the sentiment from its tweet and obtain
>> the average sentiment of the entity and the number of appearances.
>>
>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>> flattenedNER.registerTempTable(**"df"**)
>>
>>
>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>> count(col) as count FROM df GROUP BY col"**
>> finalDF = spark.sql(querySelect)
>>
>> query = 
>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>
>>
>> The resulting DF is processed with a function that separates each column
>> in a list and prints it.
>>
>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>> *df**.**select**(**"entity"**).**collect**()]*
>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>> *df**.**select**(**"sentiment"**).**collect**()]*
>> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
>> *in* *df**.**select**(**"count"**).**collect**()]*
>>
>> *print(**entities**,* *sentiments**,* *counts**)*
>>
>>
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>>
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>>
>> Thanks in advance!

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Sean Owen
It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good
practice.

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla 
wrote:

> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
>
> *print(**entities**,* *sentiments**,* *counts**)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>


[Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Xavier Gervilla
Hi 
Team,https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration



I'm developing a project that retrieves tweets on a 'host' app, streams them to 
Spark and 
with different operations with DataFrames obtains the Sentiment of the 
tweets and their entities applying a Sentiment model and a NER model 
respectively.



The problem I've come across is that when applying the NER model, the 
RAM consumption increases until the program stops with a memory error 
because there's no memory left to execute. In addition, on SparkUI I've seen 
that 
there's only one executor running, the executor driver, but using htop on the 
terminal I see that the 8 cores of the instance are executing at 100%.




The SparkSession is only configured to receive the tweets from the socket
 that connects with the second program that sends the tweets. The 
DataFrame goes through some processing to obtain other properties of the
 tweet like its sentiment (which causes no error even with less than 8GB
 of RAM) and then the NER is applied.

spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate()
rawTweets = spark.readStream.format("socket").option("host", 
"localhost").option("port",9008).load()
tweets = rawTweets.selectExpr("CAST(value AS STRING)")

#prior processing of the tweets
sentDF = other_processing(tweets)

#obtaining the column that contains the list of entities from a tweet
nerDF = ner_classification(sentDF)



This is the code of the functions related to obtaining the NER, the "main call" 
and the UDF function.

nerModel = spacy.load("en_core_web_sm")

#main call, applies the UDF function to every tweet from the "tweet" column
def ner_classification(words):
ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
words = words.withColumn("nerlist", ner_list("tweet"))
return words

#udf function
def obtain_ner_udf(words):
#if the tweet is empty return None
if words == "":
return None
#else: applying the NER model (Spacy en_core_web_sm)
entities = nerModel(words)

#returns a list of the form ['entity1_label1', 'entity2_label2',...]
return [ word.text + '_' + word.label_ for word in entities.ents ]






And lastly I map each entity with the sentiment from its tweet and 
obtain the average sentiment of the entity and the number of 
appearances.



flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
flattenedNER.registerTempTable("df")


querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as 
count FROM df GROUP BY col"
finalDF = spark.sql(querySelect)

query = 
finalDF.writeStream.foreachBatch(processBatch).outputMode("complete").start()



The resulting DF is processed with a function that separates each column in a 
list and prints it.



def processBatch(df, epoch_id):
entities = [str(t.entity) for t in df.select("entity").collect()]

sentiments = [float(t.sentiment) for t in df.select("sentiment").collect()]

counts = [int(row.asDict()['count']) for row in 
df.select("count").collect()]


    print(entities, sentiments, counts) 





At first I tried with other NER models from Flair they have the same effect, 
after printing the first batch memory use starts increasing until it fails and 
stops the execution because of the memory error. When applying a "simple" 
function instead of the NER model, such as return words.split() on the UDF 
there's no such error so  the data ingested should not be what's causing the 
overload but the model.



Is there a way to prevent the excessive RAM consumption? Why is there only the 
driver executor and no other executors are generated? How could I prevent it 
from 
collapsing when applying the NER model?



Thanks in advance!

[Spark Streaming]: Why planInputPartitions is called multiple times for each micro-batch in Spark 3?

2022-04-13 Thread Hussain, Saghir
Hi All

While upgrading our custom streaming data source from Spark 2.4.5 to Spark 
3.2.1, we observed that the planInputPartitions() method in MicroBatchStream is 
being called multiple times(4 in our case) for each micro-batch in Spark 3.

The Apache Spark documentation also says that :
The method planInputPartitions will be called multiple times, to launch one 
Spark job for each micro-batch in this data 
stream.

What is the reason for this?

Thanks & Regards,
Saghir Hussain


Re: protobuf data as input to spark streaming

2022-04-08 Thread Kiran Biswal
Hello Stelios

Just a gentle follow up if you can share any sample code/repo

Regards
Kiran

On Wed, Apr 6, 2022 at 3:19 PM Kiran Biswal  wrote:

> Hello Stelios
>
> Preferred language would have been Scala or pyspark but if Java is proven
> I am open to using it
>
> Any sample reference or example code link?
>
> How are you handling the peotobuf to spark dataframe conversion
> (serialization federalization)?
>
> Thanks
> Kiran
>
> On Wed, Apr 6, 2022, 2:38 PM Stelios Philippou  wrote:
>
>> Yes we are currently using it as such.
>> Code is in java. Will that work?
>>
>> On Wed, 6 Apr 2022 at 00:51, Kiran Biswal  wrote:
>>
>>> Hello Experts
>>>
>>> Has anyone used protobuf (proto3) encoded data (from kafka) as input
>>> source and been able to do spark structured streaming?
>>>
>>> I would appreciate if you can share any sample code/example
>>>
>>> Regards
>>> Kiran
>>>



Re: protobuf data as input to spark streaming

2022-04-06 Thread Kiran Biswal
Hello Stelios

Preferred language would have been Scala or pyspark but if Java is proven I
am open to using it

Any sample reference or example code link?

How are you handling the peotobuf to spark dataframe conversion
(serialization federalization)?

Thanks
Kiran

On Wed, Apr 6, 2022, 2:38 PM Stelios Philippou  wrote:

> Yes we are currently using it as such.
> Code is in java. Will that work?
>
> On Wed, 6 Apr 2022 at 00:51, Kiran Biswal  wrote:
>
>> Hello Experts
>>
>> Has anyone used protobuf (proto3) encoded data (from kafka) as input
>> source and been able to do spark structured streaming?
>>
>> I would appreciate if you can share any sample code/example
>>
>> Regards
>> Kiran
>>
>>>


Re: protobuf data as input to spark streaming

2022-04-06 Thread Stelios Philippou
Yes we are currently using it as such.
Code is in java. Will that work?

On Wed, 6 Apr 2022 at 00:51, Kiran Biswal  wrote:

> Hello Experts
>
> Has anyone used protobuf (proto3) encoded data (from kafka) as input
> source and been able to do spark structured streaming?
>
> I would appreciate if you can share any sample code/example
>
> Regards
> Kiran
>
>>


protobuf data as input to spark streaming

2022-04-05 Thread Kiran Biswal
Hello Experts

Has anyone used protobuf (proto3) encoded data (from kafka) as input source
and been able to do spark structured streaming?

I would appreciate if you can share any sample code/example

Regards
Kiran

>


python API in Spark-streaming-kafka spark 3.2.1

2022-03-07 Thread Wiśniewski Michał
Hi,
I've read in the documentation, that since spark 3.2.1 python API for 
spark-streaming-kafka is back in the game.
https://spark.apache.org/docs/3.2.1/streaming-programming-guide.html#advanced-sources

But in the Kafka Integration Guide there is no documentation for the python API.
https://spark.apache.org/docs/3.2.1/streaming-kafka-0-10-integration.html

Where can I find basic information how to read from/write to kafka in 
spark-streaming in spark 3.2.1 using python?



---
Regards,
Michał Wiśniewski



Software Developer
tel.: +48 58 52-49-181 , mobile: +48 571322090



[GWP]



Wirtualna Polska Media S.A. | http://onas.wp.pl
02-092 Warszawa, ul. Żwirki i Wigury 16 | NIP 5272645593



Spółki Grupy Wirtualna Polska:

Wirtualna Polska Holding Spółka Akcyjna z siedzibą w Warszawie, ul. Żwirki i 
Wigury 16, 02-092 Warszawa, wpisana do Krajowego Rejestru Sądowego - Rejestru 
Przedsiębiorców prowadzonego przez Sąd Rejonowy dla m.st. Warszawy w Warszawie 
pod nr KRS: 407130, kapitał zakładowy: 1 461 895,65 zł (w całości 
wpłacony), Numer Identyfikacji Podatkowej (NIP): 521-31-11-513

Wirtualna Polska Media Spółka Akcyjna z siedzibą w Warszawie, ul. Żwirki i 
Wigury 16, 02-092 Warszawa, wpisana do Krajowego Rejestru Sądowego - Rejestru 
Przedsiębiorców prowadzonego przez Sąd Rejonowy dla m.st. Warszawy w Warszawie 
pod nr KRS: 580004, kapitał zakładowy: 320 058 550,00 zł (w całości 
wpłacony), Numer Identyfikacji Podatkowej (NIP): 527-26-45-593

Administratorem udostępnionych danych osobowych jest Wirtualna Polska Media 
S.A. z siedzibą w Warszawie (dalej "WPM"). WPM przetwarza Twoje dane osobowe, 
które zostały podane przez Ciebie dobrowolnie w trakcie dotychczasowej 
współpracy, w związku z zawarciem umowy lub zostały zebrane ze źródeł 
powszechnie dostępnych, w szczególności: imię i nazwisko, adres email, numer 
telefonu. Przetwarzamy te dane w celach opisanych w polityce 
prywatności<https://onas.wp.pl/poufnosc.html>, między innymi w celu realizacji 
współpracy, realizacji obowiązków przewidzianych prawem, w celach 
marketingowych WP. Podstawą prawną przetwarzania Twoich danych osobowych w 
celach marketingowych jest prawnie uzasadniony interes jakim jest m.in. 
przesyłanie informacji marketingowych o usługach WP, w tym zaproszeń na 
konferencje branżowe, informacje o publikacjach. Twoje dane możemy przekazywać 
podmiotom przetwarzającym je na nasze zlecenie oraz podmiotom uprawnionym do 
uzyskania danych na podstawie obowiązującego prawa. Masz prawo m.in. do żądania 
dostępu do danych, sprostowania, usunięcia lub ograniczenia ich przetwarzania, 
jak również prawo do zgłoszenia sprzeciwu w przewidzianych w prawie sytuacjach. 
Prawa te oraz sposób ich realizacji opisaliśmy w polityce 
prywatności<https://onas.wp.pl/poufnosc.html>. Tam też znajdziesz informacje 
jak zakomunikować nam Twoją wolę skorzystania z tych praw.


Re: Spark Streaming | Dynamic Action Support

2022-03-03 Thread Mich Talebzadeh
In short, I don't think there is such a possibility. However, there is the
option of shutting down spark gracefully with checkpoint directory enabled.
In such a way you can  re-submit the modified code which will pick up
BatchID from where it was left off, assuming the topic is the same. See the
thread
"How to gracefully shutdown Spark Structured Streaming" in
https://lists.apache.org/list.html?user@spark.apache.org

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 3 Mar 2022 at 15:49, Mich Talebzadeh 
wrote:

> What is the definition of action here?
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 3 Mar 2022 at 10:56, Pappu Yadav  wrote:
>
>> Hi,
>>
>> Is there any way I can add/delete actions/jobs dynamically in a running
>> spark streaming job.
>> I will call an API and execute only the configured actions in the system.
>>
>> Eg . In the first batch suppose there are 5 actions in the spark
>> application.
>> Now suppose some configuration is changed and one action is added and one
>> is deleted.
>> How can i handle this in the spark streaming job without restarting the
>> application
>>
>


Spark Streaming | Dynamic Action Support

2022-03-03 Thread Pappu Yadav
Hi,

Is there any way I can add/delete actions/jobs dynamically in a running
spark streaming job.
I will call an API and execute only the configured actions in the system.

Eg . In the first batch suppose there are 5 actions in the spark
application.
Now suppose some configuration is changed and one action is added and one
is deleted.
How can i handle this in the spark streaming job without restarting the
application


Failed to construct kafka consumer, Failed to load SSL keystore + Spark Streaming

2022-02-12 Thread joyan sil
Hi All,

I am trying to read from Kafka using spark streaming from spark-shell but
getting the below error. Any suggestions to fix this is much appreciated.

I am running from spark-shell hence it is client mode and the files are
available in the local filesystem.

I tried to access the files as shown below. But I still get the same error.
Any suggestions to make this work from spark-shell

spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.8
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--files
/local_dir/kafka.client.truststore.jks,/local_dir/test.kafka.client.xxx.com.jks

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1,server2")
.option("subscribe", "wm-cth-salesstreams")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100)
.option("failOnDataLoss", false)
.option("kafka.security.protocol","SSL")

*
//.option("kafka.ssl.truststore.location","/local_dir/kafka.client.truststore.jks").option("kafka.ssl.truststore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/kafka.client.truststore.jks"))*
.option("kafka.ssl.truststore.password","pwd")
.option("kafka.ssl.keystore.password","pwd")

*
//.option("kafka.ssl.keystore.location","/local_dir/test.kafka.client.xxx.com.jks")).load.option("kafka.ssl.keystore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/test.kafka.client.xxx.com.jks"))).load*

Exception:
22/02/12 15:57:03 INFO org.apache.spark.sql.kafka010.KafkaMicroBatchReader:
Initial offsets:
{"wm-cth-salesstreams":{"23":167267092,"59":167276860,"50":167258479,"32":167281169,"41":167272687,"53":167256274,"17":167269072,"8":167282513,"35":167298150,"44":167244867,"26":167242913,"11":167283073,"56":167304913,"29":167307963,"38":167287380,"47":167312027,"20":167280591,"2":167248970,"5":167308945,"14":167231970,"46":167267534,"55":167275890,"58":167287699,"49":167245856,"40":167247065,"13":167249522,"4":167301468,"22":167269011,"31":167349129,"16":167266948,"7":167272315,"52":167276042,"43":167273593,"25":167232737,"34":167264787,"10":167265137,"37":167252586,"1":167312454,"19":167247237,"28":167280632,"54":167307408,"45":167280214,"27":167249248,"36":167282370,"18":167223580,"9":167223643,"57":167340670,"21":167277793,"48":167273190,"3":167294084,"12":167299093,"30":167236443,"39":167311503,"15":167274468,"42":167292272,"51":167252733,"24":167245661,"6":167241738,"33":167224273,"0":167295530}}
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:713)
... 51 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
... 55 more
*Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore /local_dir/test.kafka.client.xxx.com.jks of type JKS*
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at
org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 56 more
*Caused by: java.io.FileNotFoundException:
/local_dir/test.kafka.client.xxx.com.jks (No such file or directory)*
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileInputStream.(FileInputStream.java:93)
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
... 58 more


Re: Kafka to spark streaming

2022-01-30 Thread Gourav Sengupta
Hi Amit,

before answering your question, I am just trying to understand it.

I am not exactly clear how do the Akka application, Kafka and SPARK
Streaming application sit together, and what are you exactly trying to
achieve?

Can you please elaborate?

Regards,
Gourav


On Fri, Jan 28, 2022 at 10:14 PM Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Re: Kafka to spark streaming

2022-01-29 Thread Amit Sharma
Thanks Mich. The link you shared have two options Kafka and Socket only.


Thanks
Amit

On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh 
wrote:

> So you have a classic architecture with spark receiving events through a
> kafka topic via kafka-spark-connector, do something with it and send data
> out to the consumer. Are you using Spark structured streaming here with
> batch streaming? check
>
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide
>
> HTH
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:
>
>> Hello everyone, we have spark streaming application. We send request to
>> stream through Akka actor using Kafka topic. We wait for response as it is
>> real time. Just want a suggestion is there any better option like Livy
>> where we can send and receive request to spark streaming.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Kafka to spark streaming

2022-01-29 Thread Mich Talebzadeh
So you have a classic architecture with spark receiving events through a
kafka topic via kafka-spark-connector, do something with it and send data
out to the consumer. Are you using Spark structured streaming here with
batch streaming? check

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide

HTH

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Kafka to spark streaming

2022-01-28 Thread Amit Sharma
Hello everyone, we have spark streaming application. We send request to
stream through Akka actor using Kafka topic. We wait for response as it is
real time. Just want a suggestion is there any better option like Livy
where we can send and receive request to spark streaming.


Thanks
Amit


[spark streaming] how to connect to rabbitmq with spark streaming.

2021-10-04 Thread Joris Billen
Hi,
I am looking for someone who has made a spark streaming job that connects to 
rabbitmq.
There is a lot of documentation how to make a connection with a java api (like 
here: https://www.rabbitmq.com/api-guide.html#connecting) , but I am looking 
for a recent working example for spark streaming (which will save the incoming 
data in a Dstream).
Tried so far: this looks close, but throws errors: 
https://github.com/Stratio/spark-rabbitmq/issues ). Also tried with MQTT (we 
get an error that the protocal is amqp:// and not tcp:// or ssl://; also 
noticed that for mqtt the plugin needs to be enabled on the rabbitmq server's 
side, and since it is 3rd party we dont control this).


Thanks for any input!



Re: question regarding spark streaming continuous processing

2021-09-04 Thread Antonio Si
Hi all,

I would like to followup on this question. Any information would be very 
helpful.

Thanks.

Antonio.

On 2021/09/01 18:50:34, Antonio Si  wrote: 
> Hi,
> 
> Hi all, I have a couple questions regarding continuous processing:
> 
> 1.  What is the plan for continuous processing moving forward? Will this 
> eventually be released as a production feature as it seems it is still 
> experimental? 
> 2.  In microbatch streaming, there is a StreamingQueryListener and we will be 
> able to obtain the kafka offset of the last processed microbatch. Do we have 
> anything similar for continuous processing?
> 
> Any information would be helpful.
> 
> Thanks and regards,
> 
> Antonio.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark streaming to jdbc

2021-09-03 Thread igyu
val lines = spark.readStream
  .format("socket")
  //  .schema(StructType(schemas))
  .option("host", "10.3.87.23")
  .option("port", )
  .load()
  .selectExpr("CAST(value AS STRING)").as[(String)]DF = lines.map(x => {
  val obj = JSON.parseObject(x)
  val ls = new util.ArrayList()
  (obj.getString("timestamp"), obj.getString("msg"))
}).toDF("timestamp", "msg")
val q = DF.writeStream
  .foreachBatch((row, l) => {
if (row.count() != 0) {
  row.write.format("jdbc")
.mode(mode)
.options(cfg)
.save()
}

  })
  .start()

I get a error

Logical Plan:
Project [_1#10 AS timestamp#13, _2#11 AS msg#14]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#11]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#9: scala.Tuple2
  +- DeserializeToObject cast(value#2 as string).toString, obj#8: 
java.lang.String
 +- Project [cast(value#0 as string) AS value#2]
+- StreamingExecutionRelation TextSocketV2[host: 10.3.87.23, port: 
], [value#0]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2695)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2693)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.Utils$.redact(Utils.scala:2693)
at org.apache.spark.util.Utils$.redact(Utils.scala:2660)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:2071)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:177)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:548)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99)
at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:46)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:41)
at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at 

question regarding spark streaming continuous processing

2021-09-01 Thread Antonio Si
Hi,

Hi all, I have a couple questions regarding continuous processing:

1.  What is the plan for continuous processing moving forward? Will this 
eventually be released as a production feature as it seems it is still 
experimental? 
2.  In microbatch streaming, there is a StreamingQueryListener and we will be 
able to obtain the kafka offset of the last processed microbatch. Do we have 
anything similar for continuous processing?

Any information would be helpful.

Thanks and regards,

Antonio.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Files

2021-04-30 Thread muru
Yes, trigger (once=True) set to all streaming sources and it will treat as
a batch mode. Then you can use any scheduler (e.g airflow) to run it
whatever time window. With checkpointing, in the next run it will start
processing files from the last checkpoint.

On Fri, Apr 23, 2021 at 8:13 AM Mich Talebzadeh 
wrote:

> Interesting.
>
> If we go back to classic Lambda architecture on premise, you could Flume
> API to Kafka to add files to HDFS in time series bases.
>
> Most higher CDC vendors do exactly that. Oracle GoldenGate (OGG) classic
> gets data from Oracle redo logs and sends them to subscribers. One can
> deploy OGC for Big Data to enable these files to be read and processed for
> Kafka, Hive, HDFS etc.
>
> So let us assume that we create these files and stream them on object
> storage in Cloud. Then we can use Spark Structure Streaming (SSS) to act as
> ETL tool. Assuming that streaming interval to be 10 minutes, we can still
> read them but ensure that we only trigger SSS reads every 4 hours.
>
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(sendToSink). \
>  trigger(processingTime='14400 seconds'). \
>  queryName('readFiles'). \
>  start()
>
> This will ensure that spark only processes them every 4 hours.
>
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 23 Apr 2021 at 15:40, ayan guha  wrote:
>
>> Hi
>>
>> In one of the spark summit demo, it is been alluded that we should think
>> batch jobs in streaming pattern, using "run once" in a schedule.
>> I find this idea very interesting and I understand how this can be
>> achieved for sources like kafka, kinesis or similar. in fact we have
>> implemented this model for cosmos changefeed.
>>
>> My question is: can this model extend to file based sources? I understand
>> it can be for append only file  streams. The use case I have is: A CDC tool
>> like aws dms or shareplex or similar writing changes to a stream of files,
>> in date based folders. So it just goes on like T1, T2 etc folders. Also,
>> lets assume files are written every 10 mins, but I want to process them
>> every 4 hours.
>> Can I use streaming method so that it can manage checkpoints on its own?
>>
>> Best - Ayan
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Spark Streaming non functional requirements

2021-04-27 Thread Mich Talebzadeh
Forgot to add under non-functional requirements under heading



   - *Supportability and Maintainability*

Someone queried the other day on how to shutdown a streaming job
gracefully, meaning wait until such time as the "current queue" including
backlog is drained and all processing is completed.

I have come back with a suggested solution to implement this feature and
raised it as a topic in spark-developers list

http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-gracefully-shutdown-Spark-Structured-Streaming-tp31171.html

Regardless, this feature needs to be a consideration as well.

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 27 Apr 2021 at 15:16, ashok34...@yahoo.com.INVALID
 wrote:

> Hello Mich
>
> Thank you for your great explanation.
>
> Best
>
> A.
>
> On Tuesday, 27 April 2021, 11:25:19 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi,
>
> Any design (in whatever framework) needs to consider both Functional and
> non-functional requirements. Functional requirements are those which are
> related to the technical functionality of the system that we cover daily in
> this forum. The non-functional requirement is a requirement that
> specifies criteria that can be used to judge the operation of a system
> conditions, rather than specific behaviours.  From my experience the
> non-functional requirements are equally important and in some cases they
> are underestimated when systems go to production. Probably, most
> importantly they need to cover the following:
>
>
>- *Processing time meeting a service-level agreement (SLA). *
>
>   You can get some of this from Spark GUI. Are you comfortably satisfying
> the requirements? How about total delay, Back pressure etc. Are you within
> your SLA. In streaming applications, there is no such thing as an answer
> which is supposed to be late and correct. The timeliness is part of the
> application. If we get the right answer too slowly it becomes useless or
> wrong. We also need to be aware of latency trades off with throughput.
>
>- *Capacity, current and forecast. *
>
>   What is the current capacity? Have you accounted for extra demands,
> sudden surge and loads such as year-end. Can your pipeline handle 1.6-2
> times the current load
>
>- *Scalability*
>
>   How does your application scale if you have to handle multiple topics or
> new topics added at later stages? Scalability also
> includes additional nodes, on-prem or having the ability to add more
> resources such as Google Dataproc compute engines etc
>
>- *Supportability and Maintainability*
>
>   Have you updated docs and procedures in Confluence or equivalent or
> they are typically a year old :).  Is there any single point of failure due
> to skill set? Can ops support the design and maintain BAU themselves. How
> about training and hand-over
>
>- *Disaster recovery and Fault tolerance*
>
>   What provisions are made for disaster recovery. Is there any single
> point of failure in the design (end to end pipeline). Are you using
> standalone dockers or Google Kubernetes Engine (GKE or equivalent)
>
> HTH
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello,
>
> When we design a typical spark streaming process, the focus is to get
> functional requirements.
>
> However, I have been asked to provide non-functional requirements as well.
> Likely things I can consider are Fault tolerance and Reliability (component
> failures).  Are there a standard list of non-functional requirements for
> Spark streaming that one needs to consider and verify all?
>
> Thanking you
>
>


Re: Spark Streaming non functional requirements

2021-04-27 Thread ashok34...@yahoo.com.INVALID
 Hello Mich
Thank you for your great explanation.
Best
A.
On Tuesday, 27 April 2021, 11:25:19 BST, Mich Talebzadeh 
 wrote:  
 
 
Hi,
Any design (in whatever framework) needs to consider both Functional and 
non-functional requirements. Functional requirements are those which are 
related to the technical functionality of the system that we cover daily in 
this forum. The non-functional requirement is a requirement that specifies 
criteria that can be used to judge the operation of a system conditions, rather 
than specific behaviours.  From my experience the non-functional requirements 
are equally important and in some cases they are underestimated when systems go 
to production. Probably, most importantly they need to cover the following:
   
   - Processing time meeting a service-level agreement (SLA). 
  You can get some of this from Spark GUI. Are you comfortably satisfying the 
requirements? How about total delay, Back pressure etc. Are you within your 
SLA. In streaming applications, there is no such thing as an answer which is 
supposed to be late and correct. The timeliness is part of the application. If 
we get the right answer too slowly it becomes useless or wrong. We also need to 
be aware of latency trades off with throughput.
   
   - Capacity, current and forecast. 
  What is the current capacity? Have you accounted for extra demands, sudden 
surge and loads such as year-end. Can your pipeline handle 1.6-2 times the 
current load 
   
   - Scalability
  How does your application scale if you have to handle multiple topics or new 
topics added at later stages? Scalability also includes additional nodes, 
on-prem or having the ability to add more resources such as Google Dataproc 
compute engines etc   
   - Supportability and Maintainability
  Have you updated docs and procedures in Confluence or equivalent or they are 
typically a year old :).  Is there any single point of failure due to skill 
set? Can ops support the design and maintain BAU themselves. How about training 
and hand-over
   
   - Disaster recovery and Fault tolerance
  What provisions are made for disaster recovery. Is there any single point of 
failure in the design (end to end pipeline). Are you using standalone dockers 
or Google Kubernetes Engine (GKE or equivalent)
HTH

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID 
 wrote:

Hello,
When we design a typical spark streaming process, the focus is to get 
functional requirements.
However, I have been asked to provide non-functional requirements as well. 
Likely things I can consider are Fault tolerance and Reliability (component 
failures).  Are there a standard list of non-functional requirements for Spark 
streaming that one needs to consider and verify all?
Thanking you
  

Re: Spark Streaming non functional requirements

2021-04-27 Thread Mich Talebzadeh
Hi,

Any design (in whatever framework) needs to consider both Functional and
non-functional requirements. Functional requirements are those which are
related to the technical functionality of the system that we cover daily in
this forum. The non-functional requirement is a requirement that specifies
criteria that can be used to judge the operation of a system conditions,
rather than specific behaviours.  From my experience the non-functional
requirements are equally important and in some cases they are
underestimated when systems go to production. Probably, most importantly
they need to cover the following:


   - *Processing time meeting a service-level agreement (SLA). *

  You can get some of this from Spark GUI. Are you comfortably satisfying
the requirements? How about total delay, Back pressure etc. Are you within
your SLA. In streaming applications, there is no such thing as an answer
which is supposed to be late and correct. The timeliness is part of the
application. If we get the right answer too slowly it becomes useless or
wrong. We also need to be aware of latency trades off with throughput.

   - *Capacity, current and forecast. *

  What is the current capacity? Have you accounted for extra demands,
sudden surge and loads such as year-end. Can your pipeline handle 1.6-2
times the current load

   - *Scalability*

  How does your application scale if you have to handle multiple topics or
new topics added at later stages? Scalability also
includes additional nodes, on-prem or having the ability to add more
resources such as Google Dataproc compute engines etc

   - *Supportability and Maintainability*

  Have you updated docs and procedures in Confluence or equivalent or
they are typically a year old :).  Is there any single point of failure due
to skill set? Can ops support the design and maintain BAU themselves. How
about training and hand-over

   - *Disaster recovery and Fault tolerance*

  What provisions are made for disaster recovery. Is there any single point
of failure in the design (end to end pipeline). Are you using standalone
dockers or Google Kubernetes Engine (GKE or equivalent)

HTH

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Apr 2021 at 17:16, ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> When we design a typical spark streaming process, the focus is to get
> functional requirements.
>
> However, I have been asked to provide non-functional requirements as well.
> Likely things I can consider are Fault tolerance and Reliability (component
> failures).  Are there a standard list of non-functional requirements for
> Spark streaming that one needs to consider and verify all?
>
> Thanking you
>


Spark Streaming non functional requirements

2021-04-26 Thread ashok34...@yahoo.com.INVALID
Hello,
When we design a typical spark streaming process, the focus is to get 
functional requirements.
However, I have been asked to provide non-functional requirements as well. 
Likely things I can consider are Fault tolerance and Reliability (component 
failures).  Are there a standard list of non-functional requirements for Spark 
streaming that one needs to consider and verify all?
Thanking you

Re: [Spark-Streaming] moving average on categorical data with time windowing

2021-04-26 Thread Sean Owen
You might be able to do this with multiple aggregations on avg(col("col1")
== "cat1") etc, but how about pivoting the DataFrame first so that you get
columns like "cat1" being 1 or 0? you would end up with columns x
categories new columns if you want to count all categories in all cols. But
then it's just a simple aggregation on numeric values.

On Mon, Apr 26, 2021 at 9:29 AM halil  wrote:

> Hello everyone,
>
> I am trying to apply moving average on categorical data like below, which
> is a synthetic data generated by myself.
>
> sqltimestamp,col1,col2,col3,col4,col5
>
> 1618574879,cat1,cat4,cat2,cat5,cat3
>
> 1618574880,cat1,cat3,cat4,cat2,cat5
>
> 1618574881,cat5,cat3,cat4,cat2,cat1
>
> 1618574882,cat2,cat3,cat5,cat1,cat4
>
> 1618574883,cat2,cat4,cat1,cat3,cat5
>
> 1618574884,cat1,cat2,cat5,cat4,cat3
>
> 1618574885,cat5,cat3,cat2,cat1,cat4
>
> 1618574886,cat3,cat5,cat4,cat2,cat1
>
> 1618574887,cat3,cat2,cat5,cat4,cat1
>
> 1618574888,cat1,cat5,cat3,cat2,cat4
>
>
>
>
> I like to take the average of the number of "cat1" in the column "col1"
> for each 5 minutes window according to the column "sqltimestamp". I solved
> when column is numeric but I couldn't solve it when the column is
> categorical as above.
>
>
> The code below produces rows of tuples (timestamp, count) and I cannot
> apply avg aggregate function on the result because spark does not support
> multiple aggregation functions on one streaming.
>
> val movavgDF = spark
>
>   .readStream
>
>   .schema(schema)
>
>   .option("failOnDataLoss", true)
>   .option("delimiter", ",")
>   .csv(inputParameters.csvSinkDir)
>
> .withWatermark("sqltimestamp", "5 seconds")
> .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
> .agg(
> count( when( col("col1") === "cat1", 1)).as("count")
> )
> .withColumn("window_start", col("time_frame")("start").cast(TimestampType
> ))
> .drop("time_frame")
> .orderBy("window_start")
>
>
> After my searches on the net, I have come to the conclusion that we can do it 
> if it is not structural streaming, but I need it while streaming.
>
> I would be very happy if you can provide me a solution for this problem.
>
> Thank you very much in advance.
>
> Best,
>
> -halil.
>
>
>
>
>
>


[Spark-Streaming] moving average on categorical data with time windowing

2021-04-26 Thread halil
Hello everyone,

I am trying to apply moving average on categorical data like below, which
is a synthetic data generated by myself.

sqltimestamp,col1,col2,col3,col4,col5

1618574879,cat1,cat4,cat2,cat5,cat3

1618574880,cat1,cat3,cat4,cat2,cat5

1618574881,cat5,cat3,cat4,cat2,cat1

1618574882,cat2,cat3,cat5,cat1,cat4

1618574883,cat2,cat4,cat1,cat3,cat5

1618574884,cat1,cat2,cat5,cat4,cat3

1618574885,cat5,cat3,cat2,cat1,cat4

1618574886,cat3,cat5,cat4,cat2,cat1

1618574887,cat3,cat2,cat5,cat4,cat1

1618574888,cat1,cat5,cat3,cat2,cat4




I like to take the average of the number of "cat1" in the column "col1" for
each 5 minutes window according to the column "sqltimestamp". I solved when
column is numeric but I couldn't solve it when the column is categorical as
above.


The code below produces rows of tuples (timestamp, count) and I cannot
apply avg aggregate function on the result because spark does not support
multiple aggregation functions on one streaming.

val movavgDF = spark

  .readStream

  .schema(schema)

  .option("failOnDataLoss", true)
  .option("delimiter", ",")
  .csv(inputParameters.csvSinkDir)

.withWatermark("sqltimestamp", "5 seconds")
.groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
.agg(
count( when( col("col1") === "cat1", 1)).as("count")
)
.withColumn("window_start", col("time_frame")("start").cast(TimestampType))
.drop("time_frame")
.orderBy("window_start")


After my searches on the net, I have come to the conclusion that we
can do it if it is not structural streaming, but I need it while
streaming.

I would be very happy if you can provide me a solution for this problem.

Thank you very much in advance.

Best,

-halil.


Re: Spark Streaming with Files

2021-04-23 Thread Mich Talebzadeh
Interesting.

If we go back to classic Lambda architecture on premise, you could Flume
API to Kafka to add files to HDFS in time series bases.

Most higher CDC vendors do exactly that. Oracle GoldenGate (OGG) classic
gets data from Oracle redo logs and sends them to subscribers. One can
deploy OGC for Big Data to enable these files to be read and processed for
Kafka, Hive, HDFS etc.

So let us assume that we create these files and stream them on object
storage in Cloud. Then we can use Spark Structure Streaming (SSS) to act as
ETL tool. Assuming that streaming interval to be 10 minutes, we can still
read them but ensure that we only trigger SSS reads every 4 hours.

 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(sendToSink). \
 trigger(processingTime='14400 seconds'). \
 queryName('readFiles'). \
 start()

This will ensure that spark only processes them every 4 hours.


HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 23 Apr 2021 at 15:40, ayan guha  wrote:

> Hi
>
> In one of the spark summit demo, it is been alluded that we should think
> batch jobs in streaming pattern, using "run once" in a schedule.
> I find this idea very interesting and I understand how this can be
> achieved for sources like kafka, kinesis or similar. in fact we have
> implemented this model for cosmos changefeed.
>
> My question is: can this model extend to file based sources? I understand
> it can be for append only file  streams. The use case I have is: A CDC tool
> like aws dms or shareplex or similar writing changes to a stream of files,
> in date based folders. So it just goes on like T1, T2 etc folders. Also,
> lets assume files are written every 10 mins, but I want to process them
> every 4 hours.
> Can I use streaming method so that it can manage checkpoints on its own?
>
> Best - Ayan
> --
> Best Regards,
> Ayan Guha
>


Spark Streaming with Files

2021-04-23 Thread ayan guha
Hi

In one of the spark summit demo, it is been alluded that we should think
batch jobs in streaming pattern, using "run once" in a schedule.
I find this idea very interesting and I understand how this can be achieved
for sources like kafka, kinesis or similar. in fact we have implemented
this model for cosmos changefeed.

My question is: can this model extend to file based sources? I understand
it can be for append only file  streams. The use case I have is: A CDC tool
like aws dms or shareplex or similar writing changes to a stream of files,
in date based folders. So it just goes on like T1, T2 etc folders. Also,
lets assume files are written every 10 mins, but I want to process them
every 4 hours.
Can I use streaming method so that it can manage checkpoints on its own?

Best - Ayan
-- 
Best Regards,
Ayan Guha


Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Attila Zsolt Piros
Hi!

I am just guessing here (as Gabor said before we need more information /
logs):
But is it possible Renu that you just upgraded one single jar?

Best Regards,
Attila

On Tue, Mar 16, 2021 at 11:31 AM Gabor Somogyi 
wrote:

> Well, this is not much. Please provide driver and executor logs...
>
> G
>
>
> On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:
>
>> Hi Team,
>>
>>
>> I have upgraded my spark streaming from 2.2 to 2.4 but getting below
>> error:
>>
>>
>> spark-streaming-kafka_0-10.2.11_2.4.0
>>
>>
>> scala 2.11
>>
>>
>> Any Idea?
>>
>>
>>
>> main" java.lang.AbstractMethodError
>>
>> at
>> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>>
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>>
>>
>> Thanks & Regards,
>>
>> Renu Yadav
>>
>>


Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Gabor Somogyi
Well, this is not much. Please provide driver and executor logs...

G


On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:

> Hi Team,
>
>
> I have upgraded my spark streaming from 2.2 to 2.4 but getting below error:
>
>
> spark-streaming-kafka_0-10.2.11_2.4.0
>
>
> scala 2.11
>
>
> Any Idea?
>
>
>
> main" java.lang.AbstractMethodError
>
> at
> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>
>
> Thanks & Regards,
>
> Renu Yadav
>
>


Spark streaming giving error for version 2.4

2021-03-15 Thread Renu Yadav
Hi Team,


I have upgraded my spark streaming from 2.2 to 2.4 but getting below error:


spark-streaming-kafka_0-10.2.11_2.4.0


scala 2.11


Any Idea?



main" java.lang.AbstractMethodError

at
org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)

at
org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)

at
org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)

at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)

at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)


Thanks & Regards,

Renu Yadav


Re: DB Config data update across multiple Spark Streaming Jobs

2021-03-15 Thread forece85
Any suggestion on this? How to update configuration data on all executors
with out downtime?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DB Config data update across multiple Spark Streaming Jobs

2021-03-13 Thread forece85
Hi,

We have multiple spark jobs running on a single EMR cluster. All jobs use
same business related configurations which are stored in Postgres. How to
update this configuration data at all executors dynamically if any changes
happened to Postgres db data with out spark restarts.

We are using Kinesis for streaming. Tried of creating new kinesis stream
called cache. Pushing a dummy event and processing in all sparks to refresh
all configuration data at all executors. But not working good. Any better
approach for this problem statement? Or how to correctly implement this?

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
Not sure if kinesis have such flexibility. What else possibilities are there
at transformations level?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
Any example for this please



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread Sean Owen
You can also group by the key in the transformation on each batch. But yes
that's faster/easier if it's already partitioned that way.

On Tue, Mar 9, 2021 at 7:30 AM Ali Gouta  wrote:

> Do not know Kenesis, but it looks like it works like kafka. Your producer
> should implement a paritionner that makes it possible to send your data
> with the same key to the same partition. Though, each task in your spark
> streaming app will load data from the same partition in the same executor.
> I think this is the simplest way to achieve what you want to do.
>
> Best regards,
> Ali Gouta.
>
> On Tue, Mar 9, 2021 at 11:30 AM forece85  wrote:
>
>> We are doing batch processing using Spark Streaming with Kinesis with a
>> batch
>> size of 5 mins. We want to send all events with same eventId to same
>> executor for a batch so that we can do multiple events based grouping
>> operations based on eventId. No previous batch or future batch data is
>> concerned. Only Current batch keyed operation needed.
>>
>> Please help me how to achieve this.
>>
>> Thanks.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread Ali Gouta
Do not know Kenesis, but it looks like it works like kafka. Your producer
should implement a paritionner that makes it possible to send your data
with the same key to the same partition. Though, each task in your spark
streaming app will load data from the same partition in the same executor.
I think this is the simplest way to achieve what you want to do.

Best regards,
Ali Gouta.

On Tue, Mar 9, 2021 at 11:30 AM forece85  wrote:

> We are doing batch processing using Spark Streaming with Kinesis with a
> batch
> size of 5 mins. We want to send all events with same eventId to same
> executor for a batch so that we can do multiple events based grouping
> operations based on eventId. No previous batch or future batch data is
> concerned. Only Current batch keyed operation needed.
>
> Please help me how to achieve this.
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


  1   2   3   4   5   6   7   8   9   10   >