Re: What is the best way to organize a join within a foreach?

2023-04-27 Thread Amit Joshi
Hi Marco,

I am not sure if you will get access to data frame inside the for each, as
spark context used to be non serialized, if I remember correctly.

One thing you can do.
Use cogroup operation on both the dataset.
This will help you have (Key- iter(v1),itr(V2).
And then use for each partition for performing your task of converting to
json and more.

Thus performance wise, you can group batch per user records and also share
the same connection in each partition if needed.

Hope this will help.

Regards
Amit


On Wed, 26 Apr, 2023, 15:58 Marco Costantini, <
marco.costant...@rocketfncl.com> wrote:

> Thanks team,
> Email was just an example. The point was to illustrate that some actions
> could be chained using Spark's foreach. In reality, this is an S3 write and
> a Kafka message production, which I think is quite reasonable for spark to
> do.
>
> To answer Ayan's first question. Yes, all a users orders, prepared for
> each and every user.
>
> Other than the remarks that email transmission is unwise (which I've now
> reminded is irrelevant) I am not seeing an alternative to using Spark's
> foreach. Unless, your proposal is for the Spark job to target 1 user, and
> just run the job 1000's of times taking the user_id as input. That doesn't
> sound attractive.
>
> Also, while we say that foreach is not optimal, I cannot find any evidence
> of it; neither here nor online. If there are any docs about the inner
> workings of this functionality, please pass them to me. I continue to
> search for them. Even late last night!
>
> Thanks for your help team,
> Marco.
>
> On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh 
> wrote:
>
>> Indeed very valid points by Ayan. How email is going to handle 1000s of
>> records. As a solution architect I tend to replace. Users by customers and
>> for each order there must be products sort of many to many relationship. If
>> I was a customer I would also be interested in product details as
>> well.sending via email sounds like a Jurassic park solution 😗
>>
>> On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:
>>
>>> Adding to what Mitch said,
>>>
>>> 1. Are you trying to send statements of all orders to all users? Or the
>>> latest order only?
>>>
>>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>>> notification service or function. Spark should write to a queue (kafka,
>>> sqs...pick your choice here).
>>>
>>> Best regards
>>> Ayan
>>>
>>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Well OK in a nutshell you want the result set for every user prepared
 and email to that user right.

 This is a form of ETL where those result sets need to be posted
 somewhere. Say you create a table based on the result set prepared for each
 user. You may have many raw target tables at the end of the first ETL. How
 does this differ from using forEach? Performance wise forEach may not be
 optimal.

 Can you take the sample tables and try your method?

 HTH

 Mich Talebzadeh,
 Lead Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
 marco.costant...@rocketfncl.com> wrote:

> Hi Mich,
> First, thank you for that. Great effort put into helping.
>
> Second, I don't think this tackles the technical challenge here. I
> understand the windowing as it serves those ranks you created, but I don't
> see how the ranks contribute to the solution.
> Third, the core of the challenge is about performing this kind of
> 'statement' but for all users. In this example we target Mich, but that
> reduces the complexity by a lot! In fact, a simple join and filter would
> solve that one.
>
> Any thoughts on that? For me, the foreach is desirable because I can
> have the workers chain other actions to each iteration (send email, send
> HTTP request, etc).
>
> Thanks Mich,
> Marco.
>
> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Marco,
>>
>> First thoughts.
>>
>> foreach() is an action operation that is to iterate/loop over each
>> element in the dataset, meaning cursor based. That is different from
>> operating over the dataset as a set which is far more efficient.
>>

config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Hi Team,

I am using Spark Streaming to read from Kafka and write to S3.

Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12

Dataset df =
spark
.readStream()
.format("kafka")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();

kafka.conf = {
   "kafka.bootstrap.servers": "localhost:9092",
   "subscribe": "test-topic",
   "minOffsetsPerTrigger": 1000,
   "maxOffsetsPerTrigger": 1100,
   "maxTriggerDelay": "15m",
   "groupIdPrefix": "test",
   "startingOffsets": "latest",
   "includeHeaders": true,
   "failOnDataLoss": false
  }

spark.conf = {
   "spark.master": "spark://localhost:7077",
   "spark.app.name": "app",
   "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
   "spark.sql.streaming.metricsEnabled": true
 }


But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?

Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Regards,
Abhishek Singla


Re: config: minOffsetsPerTrigger not working

2023-04-27 Thread Mich Talebzadeh
Is this all of your writeStream?

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();

What happened to the checkpoint location?

option('checkpointLocation', checkpoint_path).

example

 checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"


ls -l  /ssd/hduser/MDBatchBQ/chkpt
total 24
-rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits

so you can see what is going on

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Apr 2023 at 15:46, Abhishek Singla 
wrote:

> Hi Team,
>
> I am using Spark Streaming to read from Kafka and write to S3.
>
> Version: 3.1.2
> Scala Version: 2.12
> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>
> Dataset df =
> spark
> .readStream()
> .format("kafka")
> .options(appConfig.getKafka().getConf())
> .load()
> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
> df.writeStream()
> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
> .start()
> .awaitTermination();
>
> kafka.conf = {
>"kafka.bootstrap.servers": "localhost:9092",
>"subscribe": "test-topic",
>"minOffsetsPerTrigger": 1000,
>"maxOffsetsPerTrigger": 1100,
>"maxTriggerDelay": "15m",
>"groupIdPrefix": "test",
>"startingOffsets": "latest",
>"includeHeaders": true,
>"failOnDataLoss": false
>   }
>
> spark.conf = {
>"spark.master": "spark://localhost:7077",
>"spark.app.name": "app",
>"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
>"spark.sql.streaming.metricsEnabled": true
>  }
>
>
> But these configs do not seem to be working as I can see Spark processing
> batches of 3k-15k immediately one after another. Is there something I am
> missing?
>
> Ref:
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> Regards,
> Abhishek Singla
>
>
>
>
>
>
>
>
>


Re: config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Thanks, Mich for acknowledging.

Yes, I am providing the checkpoint path. I omitted it here in the code
snippet.

I believe this is due to spark version 3.1.x, this config is there only in
versions greater than 3.2.x

On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh 
wrote:

> Is this all of your writeStream?
>
> df.writeStream()
> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
> .start()
> .awaitTermination();
>
> What happened to the checkpoint location?
>
> option('checkpointLocation', checkpoint_path).
>
> example
>
>  checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
>
>
> ls -l  /ssd/hduser/MDBatchBQ/chkpt
> total 24
> -rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
> drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
> drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
> drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
>
> so you can see what is going on
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Apr 2023 at 15:46, Abhishek Singla 
> wrote:
>
>> Hi Team,
>>
>> I am using Spark Streaming to read from Kafka and write to S3.
>>
>> Version: 3.1.2
>> Scala Version: 2.12
>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>
>> Dataset df =
>> spark
>> .readStream()
>> .format("kafka")
>> .options(appConfig.getKafka().getConf())
>> .load()
>> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>> .start()
>> .awaitTermination();
>>
>> kafka.conf = {
>>"kafka.bootstrap.servers": "localhost:9092",
>>"subscribe": "test-topic",
>>"minOffsetsPerTrigger": 1000,
>>"maxOffsetsPerTrigger": 1100,
>>"maxTriggerDelay": "15m",
>>"groupIdPrefix": "test",
>>"startingOffsets": "latest",
>>"includeHeaders": true,
>>"failOnDataLoss": false
>>   }
>>
>> spark.conf = {
>>"spark.master": "spark://localhost:7077",
>>"spark.app.name": "app",
>>"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": 
>> false,
>>"spark.sql.streaming.metricsEnabled": true
>>  }
>>
>>
>> But these configs do not seem to be working as I can see Spark processing
>> batches of 3k-15k immediately one after another. Is there something I am
>> missing?
>>
>> Ref:
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>
>>
>>
>>